You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/21 16:43:40 UTC

[kafka] branch trunk updated: KAFKA-6573: Update brokerInfo in KafkaController on listener update (#4603)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 90e0bbe  KAFKA-6573: Update brokerInfo in KafkaController on listener update (#4603)
90e0bbe is described below

commit 90e0bbec94dd85e1c5b1af0b6426df0a02e5da3f
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Wed Feb 21 16:43:25 2018 +0000

    KAFKA-6573: Update brokerInfo in KafkaController on listener update (#4603)
    
    Update `KafkaController.brokerInfo` when listeners are updated since this value is used to register the broker in ZooKeeper if there ZK session expires. Also added test to verify values in ZK after session expiry.
---
 .../scala/kafka/controller/KafkaController.scala    |  5 +++++
 .../scala/kafka/server/DynamicBrokerConfig.scala    |  2 +-
 core/src/main/scala/kafka/zk/KafkaZkClient.scala    |  5 ++++-
 .../scala/kafka/zookeeper/ZooKeeperClient.scala     |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala   | 21 +++++++++++++++++++++
 .../scala/unit/kafka/zk/ZooKeeperTestHarness.scala  | 13 +++++++++++++
 .../unit/kafka/zookeeper/ZooKeeperClientTest.scala  | 13 +++----------
 7 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f36fc79..a8707ad 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -190,6 +190,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     eventManager.put(controlledShutdownEvent)
   }
 
+  private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
+    this.brokerInfo = newBrokerInfo
+    zkClient.updateBrokerInfoInZk(newBrokerInfo)
+  }
+
   private def state: ControllerState = eventManager.state
 
   /**
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ce4b9e7..3236af0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -751,7 +751,7 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     if (listenersAdded.nonEmpty)
       server.socketServer.addListeners(listenersAdded)
 
-    server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo)
+    server.kafkaController.updateBrokerInfo(server.createBrokerInfo)
   }
 
   private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index afc8202..6545fde 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
@@ -61,6 +61,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   import KafkaZkClient._
 
+  // Only for testing
+  private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper
+
   /**
    * Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect
    * and a monotonically increasing number will be appended to its name.
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 3934fd0..efbd6e8 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -317,7 +317,7 @@ class ZooKeeperClient(connectString: String,
   }
 
   // Only for testing
-  private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
+  private[kafka] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
     zooKeeper
   }
   
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index ee38762..f0bd61a 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -599,6 +599,26 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val invalidHost = "192.168.0.1"
     alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
 
+    def validateEndpointsInZooKeeper(server: KafkaServer, endpointMatcher: String => Boolean): Unit = {
+      val brokerInfo = zkClient.getBroker(server.config.brokerId)
+      assertTrue("Broker not registered", brokerInfo.nonEmpty)
+      val endpoints = brokerInfo.get.endPoints.toString
+      assertTrue(s"Endpoint update not saved $endpoints", endpointMatcher(endpoints))
+    }
+
+    // Verify that endpoints have been updated in ZK for all brokers
+    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost)))
+
+    // Trigger session expiry and ensure that controller registers new advertised listener after expiry
+    val controllerEpoch = zkClient.getControllerEpoch
+    val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller")))
+    val controllerZkClient = controllerServer.zkClient
+    val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper)
+    sessionExpiringClient.close()
+    TestUtils.waitUntilTrue(() => zkClient.getControllerEpoch != controllerEpoch,
+      "Controller not re-elected after ZK session expiry")
+    TestUtils.retry(10000)(validateEndpointsInZooKeeper(controllerServer, endpoints => endpoints.contains(invalidHost)))
+
     // Verify that producer connections fail since advertised listener is invalid
     val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
     val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap)
@@ -606,6 +626,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val sendFuture = verifyConnectionFailure(producer1)
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
+    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
 
     // Verify that produce/consume work now
     val producer = createProducer(trustStoreFile1, retries = 0)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 9e72583..a122297 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
 import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 
 @Category(Array(classOf[IntegrationTest]))
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@@ -67,6 +68,18 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)
   }
+
+  // Trigger session expiry by reusing the session id in another client
+  def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
+    val dummyWatcher = new Watcher {
+      override def process(event: WatchedEvent): Unit = {}
+    }
+    val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+      zooKeeper.getSessionId,
+      zooKeeper.getSessionPasswd)
+    assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+    anotherZkClient
+  }
 }
 
 object ZooKeeperTestHarness {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index f1c09d7..2e0651c 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, ZooKeeper}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -456,14 +456,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       requestThread.start()
       sendCompleteSemaphore.acquire() // Wait for request thread to start processing requests
 
-      // Trigger session expiry by reusing the session id in another client
-      val dummyWatcher = new Watcher {
-        override def process(event: WatchedEvent): Unit = {}
-      }
-      val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
-        zooKeeperClient.currentZooKeeper.getSessionId,
-        zooKeeperClient.currentZooKeeper.getSessionPasswd)
-      assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+      val anotherZkClient = createZooKeeperClientToTriggerSessionExpiry(zooKeeperClient.currentZooKeeper)
       sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail
       anotherZkClient.close()
       sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.