You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/05/26 16:50:51 UTC

kafka git commit: Setting broker state as running after publishing to ZK

Repository: kafka
Updated Branches:
  refs/heads/trunk 62dc1afb6 -> be0f3502d


Setting broker state as running after publishing to ZK

junrao

Currently, the broker state is set to running before it registers itself in ZooKeeper.  This is too early in the broker lifecycle.  If clients use the broker state as an indicator that the broker is ready to accept requests, they will get errors.  This change is to delay setting the broker state to running until it's registered in ZK.

Author: Roger Hoover <ro...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #1426 from theduderog/broker-running-after-zk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be0f3502
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be0f3502
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be0f3502

Branch: refs/heads/trunk
Commit: be0f3502da1f703b27d7a3fae5a01325dff44957
Parents: 62dc1af
Author: Roger Hoover <ro...@gmail.com>
Authored: Thu May 26 09:50:49 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu May 26 09:50:49 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/server/KafkaServer.scala   |  2 +-
 .../unit/kafka/server/ServerStartupTest.scala   | 28 +++++++++++++++++++-
 2 files changed, 28 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be0f3502/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index de3054a..f95d9ef 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -214,7 +214,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator,
           kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
-        brokerState.newState(RunningAsBroker)
 
         Mx4jLoader.maybeLoad()
 
@@ -249,6 +248,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
         /* register broker metrics */
         registerStats()
 
+        brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
         isStartingUp.set(false)

http://git-wip-us.apache.org/repos/asf/kafka/blob/be0f3502/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 9b49365..b5560c3 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -20,8 +20,8 @@ package kafka.server
 import kafka.utils.ZkUtils
 import kafka.utils.CoreUtils
 import kafka.utils.TestUtils
-
 import kafka.zk.ZooKeeperTestHarness
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
 
@@ -82,4 +82,30 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     server.shutdown()
     CoreUtils.delete(server.config.logDirs)
   }
+
+  @Test
+  def testBrokerStateRunningAfterZK {
+    val brokerId = 0
+    val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
+
+    class BrokerStateInterceptor() extends BrokerState {
+      override def newState(newState: BrokerStates): Unit = {
+        val brokers = zkUtils.getAllBrokersInCluster()
+        assertEquals(1, brokers.size)
+        assertEquals(brokerId, brokers.head.id)
+      }
+    }
+
+    class MockKafkaServer(override val config: KafkaConfig, override val brokerState: BrokerState = mockBrokerState) extends KafkaServer(config) {}
+
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+    val server = new MockKafkaServer(KafkaConfig.fromProps(props))
+
+    EasyMock.expect(mockBrokerState.newState(RunningAsBroker)).andDelegateTo(new BrokerStateInterceptor).once()
+    EasyMock.replay(mockBrokerState)
+
+    server.startup()
+    server.shutdown()
+    CoreUtils.delete(server.config.logDirs)
+  }
 }