You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/21 16:44:00 UTC

[jira] [Commented] (KAFKA-6573) KafkaController.brokerInfo not updated on dynamic update

    [ https://issues.apache.org/jira/browse/KAFKA-6573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16371644#comment-16371644 ] 

ASF GitHub Bot commented on KAFKA-6573:
---------------------------------------

hachikuji closed pull request #4603: KAFKA-6573: Update brokerInfo in KafkaController on listener update
URL: https://github.com/apache/kafka/pull/4603
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f36fc798ad6..a8707ad887d 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -190,6 +190,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     eventManager.put(controlledShutdownEvent)
   }
 
+  private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
+    this.brokerInfo = newBrokerInfo
+    zkClient.updateBrokerInfoInZk(newBrokerInfo)
+  }
+
   private def state: ControllerState = eventManager.state
 
   /**
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index ce4b9e75b7c..3236af01bbb 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -751,7 +751,7 @@ class DynamicListenerConfig(server: KafkaServer) extends BrokerReconfigurable wi
     if (listenersAdded.nonEmpty)
       server.socketServer.addListeners(listenersAdded)
 
-    server.zkClient.updateBrokerInfoInZk(server.createBrokerInfo)
+    server.kafkaController.updateBrokerInfo(server.createBrokerInfo)
   }
 
   private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index afc8202b88a..6545fde30e9 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.security.token.delegation.{DelegationToken, Token
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import org.apache.zookeeper.data.{ACL, Stat}
-import org.apache.zookeeper.{CreateMode, KeeperException}
+import org.apache.zookeeper.{CreateMode, KeeperException, ZooKeeper}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
@@ -61,6 +61,9 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
 
   import KafkaZkClient._
 
+  // Only for testing
+  private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper
+
   /**
    * Create a sequential persistent path. That is, the znode will not be automatically deleted upon client's disconnect
    * and a monotonically increasing number will be appended to its name.
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 3934fd0ad5d..efbd6e898a8 100644
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -317,7 +317,7 @@ class ZooKeeperClient(connectString: String,
   }
 
   // Only for testing
-  private[zookeeper] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
+  private[kafka] def currentZooKeeper: ZooKeeper = inReadLock(initializationLock) {
     zooKeeper
   }
   
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index ee3876231b7..f0bd61a0ccc 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -599,6 +599,26 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val invalidHost = "192.168.0.1"
     alterAdvertisedListener(adminClient, externalAdminClient, "localhost", invalidHost)
 
+    def validateEndpointsInZooKeeper(server: KafkaServer, endpointMatcher: String => Boolean): Unit = {
+      val brokerInfo = zkClient.getBroker(server.config.brokerId)
+      assertTrue("Broker not registered", brokerInfo.nonEmpty)
+      val endpoints = brokerInfo.get.endPoints.toString
+      assertTrue(s"Endpoint update not saved $endpoints", endpointMatcher(endpoints))
+    }
+
+    // Verify that endpoints have been updated in ZK for all brokers
+    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => endpoints.contains(invalidHost)))
+
+    // Trigger session expiry and ensure that controller registers new advertised listener after expiry
+    val controllerEpoch = zkClient.getControllerEpoch
+    val controllerServer = servers(zkClient.getControllerId.getOrElse(throw new IllegalStateException("No controller")))
+    val controllerZkClient = controllerServer.zkClient
+    val sessionExpiringClient = createZooKeeperClientToTriggerSessionExpiry(controllerZkClient.currentZooKeeper)
+    sessionExpiringClient.close()
+    TestUtils.waitUntilTrue(() => zkClient.getControllerEpoch != controllerEpoch,
+      "Controller not re-elected after ZK session expiry")
+    TestUtils.retry(10000)(validateEndpointsInZooKeeper(controllerServer, endpoints => endpoints.contains(invalidHost)))
+
     // Verify that producer connections fail since advertised listener is invalid
     val bootstrap = bootstrapServers.replaceAll(invalidHost, "localhost") // allow bootstrap connection to succeed
     val producer1 = createProducer(trustStoreFile1, retries = 0, bootstrap = bootstrap)
@@ -606,6 +626,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
     val sendFuture = verifyConnectionFailure(producer1)
 
     alterAdvertisedListener(adminClient, externalAdminClient, invalidHost, "localhost")
+    servers.foreach(validateEndpointsInZooKeeper(_, endpoints => !endpoints.contains(invalidHost)))
 
     // Verify that produce/consume work now
     val producer = createProducer(trustStoreFile1, retries = 0)
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 9e7258315d8..a1222977751 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import kafka.controller.ControllerEventManager
 import org.apache.kafka.common.utils.Time
+import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 
 @Category(Array(classOf[IntegrationTest]))
 abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
@@ -67,6 +68,18 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging {
       CoreUtils.swallow(zookeeper.shutdown(), this)
     Configuration.setConfiguration(null)
   }
+
+  // Trigger session expiry by reusing the session id in another client
+  def createZooKeeperClientToTriggerSessionExpiry(zooKeeper: ZooKeeper): ZooKeeper = {
+    val dummyWatcher = new Watcher {
+      override def process(event: WatchedEvent): Unit = {}
+    }
+    val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
+      zooKeeper.getSessionId,
+      zooKeeper.getSessionPasswd)
+    assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+    anotherZkClient
+  }
 }
 
 object ZooKeeperTestHarness {
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index f1c09d7308d..2e0651c9991 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -29,8 +29,8 @@ import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException}
 import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
 import org.apache.zookeeper.ZooKeeper.States
-import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooDefs, ZooKeeper}
-import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertNull, assertTrue}
+import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Test}
 
 import scala.collection.JavaConverters._
@@ -456,14 +456,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
       requestThread.start()
       sendCompleteSemaphore.acquire() // Wait for request thread to start processing requests
 
-      // Trigger session expiry by reusing the session id in another client
-      val dummyWatcher = new Watcher {
-        override def process(event: WatchedEvent): Unit = {}
-      }
-      val anotherZkClient = new ZooKeeper(zkConnect, 1000, dummyWatcher,
-        zooKeeperClient.currentZooKeeper.getSessionId,
-        zooKeeperClient.currentZooKeeper.getSessionPasswd)
-      assertNull(anotherZkClient.exists("/nonexistent", false)) // Make sure new client works
+      val anotherZkClient = createZooKeeperClientToTriggerSessionExpiry(zooKeeperClient.currentZooKeeper)
       sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail
       anotherZkClient.close()
       sendSemaphore.release(maxInflightRequests) // Resume a few more sends which may fail


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KafkaController.brokerInfo not updated on dynamic update
> --------------------------------------------------------
>
>                 Key: KAFKA-6573
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6573
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.0
>
>
> KafkaController.brokerInfo is cached in-memory and used to re-register the broker in ZooKeeper if ZK session expires. It should be kept up-to-date if listeners are dynamically updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)