You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/02/23 20:51:43 UTC

kafka git commit: kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 3f1e08822 -> 41189ea56


kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41189ea5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41189ea5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41189ea5

Branch: refs/heads/trunk
Commit: 41189ea5601837bdb697ade31f55e244abbe6d1c
Parents: 3f1e088
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Feb 23 11:51:32 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 23 11:51:32 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/KafkaHealthcheck.scala   |  7 +---
 .../main/scala/kafka/server/KafkaServer.scala   |  2 -
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  6 ---
 .../unit/kafka/server/ServerStartupTest.scala   | 42 ++++++++++++++------
 4 files changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4acdd70..7907987 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -39,17 +39,12 @@ class KafkaHealthcheck(private val brokerId: Int,
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
   val sessionExpireListener = new SessionExpireListener
-  
+
   def startup() {
     zkClient.subscribeStateChanges(sessionExpireListener)
     register()
   }
 
-  def shutdown() {
-    zkClient.unsubscribeStateChanges(sessionExpireListener)
-    ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
-  }
-
   /**
    * Register this broker as "alive" in zookeeper
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7e5ddcb..426e522 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
       if (canShutdown) {
         Utils.swallow(controlledShutdown())
         brokerState.newState(BrokerShuttingDown)
-        if(kafkaHealthcheck != null)
-          Utils.swallow(kafkaHealthcheck.shutdown())
         if(socketServer != null)
           Utils.swallow(socketServer.shutdown())
         if(requestHandlerPool != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c78a1b6..8a2fb2d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -189,12 +189,6 @@ object ZkUtils extends Logging {
     info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
-  def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
-    val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
-    deletePath(zkClient, brokerIdPath)
-    info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
-  }
-
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 764655a..93af7df 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -26,26 +26,44 @@ import kafka.zk.ZooKeeperTestHarness
 import junit.framework.Assert._
 
 class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
-  var server : KafkaServer = null
-  val brokerId = 0
-  val zookeeperChroot = "/kafka-chroot-for-unittest"
 
-  override def setUp() {
-    super.setUp()
+  def testBrokerCreatesZKChroot {
+    val brokerId = 0
+    val zookeeperChroot = "/kafka-chroot-for-unittest"
     val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
     val zooKeeperConnect = props.get("zookeeper.connect")
     props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
-    server = TestUtils.createServer(new KafkaConfig(props))
-  }
+    val server = TestUtils.createServer(new KafkaConfig(props))
+
+    val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
+    assertTrue(pathExists)
 
-  override def tearDown() {
     server.shutdown()
     Utils.rm(server.config.logDirs)
-    super.tearDown()
   }
 
-  def testBrokerCreatesZKChroot {
-    val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
-    assertTrue(pathExists)
+  def testConflictBrokerRegistration {
+    // Try starting a broker with the a conflicting broker id.
+    // This shouldn't affect the existing broker registration.
+
+    val brokerId = 0
+    val props1 = TestUtils.createBrokerConfig(brokerId)
+    val server1 = TestUtils.createServer(new KafkaConfig(props1))
+    val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+
+    val props2 = TestUtils.createBrokerConfig(brokerId)
+    try {
+      TestUtils.createServer(new KafkaConfig(props2))
+      fail("Registering a broker with a conflicting id should fail")
+    } catch {
+      case e : RuntimeException =>
+      // this is expected
+    }
+
+    // broker registration shouldn't change
+    assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+
+    server1.shutdown()
+    Utils.rm(server1.config.logDirs)
   }
 }
\ No newline at end of file