You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/02/23 20:51:43 UTC
kafka git commit: kafka-1971;
starting a broker with a conflicting id will delete the previous
broker registration; patched by Jun Rao; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 3f1e08822 -> 41189ea56
kafka-1971; starting a broker with a conflicting id will delete the previous broker registration; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41189ea5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41189ea5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41189ea5
Branch: refs/heads/trunk
Commit: 41189ea5601837bdb697ade31f55e244abbe6d1c
Parents: 3f1e088
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Feb 23 11:51:32 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Feb 23 11:51:32 2015 -0800
----------------------------------------------------------------------
.../scala/kafka/server/KafkaHealthcheck.scala | 7 +---
.../main/scala/kafka/server/KafkaServer.scala | 2 -
core/src/main/scala/kafka/utils/ZkUtils.scala | 6 ---
.../unit/kafka/server/ServerStartupTest.scala | 42 ++++++++++++++------
4 files changed, 31 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4acdd70..7907987 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -39,17 +39,12 @@ class KafkaHealthcheck(private val brokerId: Int,
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
-
+
def startup() {
zkClient.subscribeStateChanges(sessionExpireListener)
register()
}
- def shutdown() {
- zkClient.unsubscribeStateChanges(sessionExpireListener)
- ZkUtils.deregisterBrokerInZk(zkClient, brokerId)
- }
-
/**
* Register this broker as "alive" in zookeeper
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7e5ddcb..426e522 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
if (canShutdown) {
Utils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
- if(kafkaHealthcheck != null)
- Utils.swallow(kafkaHealthcheck.shutdown())
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index c78a1b6..8a2fb2d 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -189,12 +189,6 @@ object ZkUtils extends Logging {
info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
}
- def deregisterBrokerInZk(zkClient: ZkClient, id: Int) {
- val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
- deletePath(zkClient, brokerIdPath)
- info("Deregistered broker %d at path %s.".format(id, brokerIdPath))
- }
-
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition
http://git-wip-us.apache.org/repos/asf/kafka/blob/41189ea5/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 764655a..93af7df 100644
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -26,26 +26,44 @@ import kafka.zk.ZooKeeperTestHarness
import junit.framework.Assert._
class ServerStartupTest extends JUnit3Suite with ZooKeeperTestHarness {
- var server : KafkaServer = null
- val brokerId = 0
- val zookeeperChroot = "/kafka-chroot-for-unittest"
- override def setUp() {
- super.setUp()
+ def testBrokerCreatesZKChroot {
+ val brokerId = 0
+ val zookeeperChroot = "/kafka-chroot-for-unittest"
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.choosePort())
val zooKeeperConnect = props.get("zookeeper.connect")
props.put("zookeeper.connect", zooKeeperConnect + zookeeperChroot)
- server = TestUtils.createServer(new KafkaConfig(props))
- }
+ val server = TestUtils.createServer(new KafkaConfig(props))
+
+ val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
+ assertTrue(pathExists)
- override def tearDown() {
server.shutdown()
Utils.rm(server.config.logDirs)
- super.tearDown()
}
- def testBrokerCreatesZKChroot {
- val pathExists = ZkUtils.pathExists(zkClient, zookeeperChroot)
- assertTrue(pathExists)
+ def testConflictBrokerRegistration {
+ // Try starting a broker with the a conflicting broker id.
+ // This shouldn't affect the existing broker registration.
+
+ val brokerId = 0
+ val props1 = TestUtils.createBrokerConfig(brokerId)
+ val server1 = TestUtils.createServer(new KafkaConfig(props1))
+ val brokerRegistration = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1
+
+ val props2 = TestUtils.createBrokerConfig(brokerId)
+ try {
+ TestUtils.createServer(new KafkaConfig(props2))
+ fail("Registering a broker with a conflicting id should fail")
+ } catch {
+ case e : RuntimeException =>
+ // this is expected
+ }
+
+ // broker registration shouldn't change
+ assertEquals(brokerRegistration, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1)
+
+ server1.shutdown()
+ Utils.rm(server1.config.logDirs)
}
}
\ No newline at end of file