You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:08 UTC
[23/36] git commit: Revert "KAFKA-992 follow up: Fix the zookeeper
de-registration issue for controller and consumer; reviewed by Neha Narkhede"
Revert "KAFKA-992 follow up: Fix the zookeeper de-registration issue for controller and consumer; reviewed by Neha Narkhede"
This reverts commit 81c49bbdae5e490f9d5dc7b042ee60e617fbb22b.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d75e093
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d75e093
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d75e093
Branch: refs/heads/trunk
Commit: 1d75e09313b5b3f1cde6c39fdda20bc7185cfdf6
Parents: 81c49bb
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Aug 9 15:29:08 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Aug 9 15:29:08 2013 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 30 +-----
.../kafka/controller/KafkaController.scala | 3 +-
.../kafka/server/ZookeeperLeaderElector.scala | 97 ++++----------------
core/src/main/scala/kafka/utils/ZkUtils.scala | 22 +----
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
5 files changed, 28 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..0ca2850 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,35 +213,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// this API is used by unit tests only
def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
- private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
+ private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
info("begin registering consumer " + consumerIdString + " in ZK")
- val timestamp = SystemTime.milliseconds.toString
val consumerRegistrationInfo =
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
- ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
-
- while (true) {
- try {
- createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-
- info("end registering consumer " + consumerIdString + " in ZK")
- return
- } catch {
- case e: ZkNodeExistsException => {
- // An ephemeral node may still exist even after its corresponding session has expired
- // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
- // and hence the write succeeds without ZkNodeExistsException
- ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
- case Some(consumerZKString) => {
- info("I wrote this conflicted ephemeral node a while back in a different session, "
- + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
- Thread.sleep(config.zkSessionTimeoutMs)
- }
- case None => // the node disappeared; retry creating the ephemeral node immediately
- }
- }
- }
- }
+ ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
+ createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
+ info("end registering consumer " + consumerIdString + " in ZK")
}
private def sendShutdownToAllQueues() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 800f900..c87caab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,7 +37,6 @@ import scala.Some
import kafka.common.TopicAndPartition
class ControllerContext(val zkClient: ZkClient,
- val zkSessionTimeout: Int,
var controllerChannelManager: ControllerChannelManager = null,
val controllerLock: Object = new Object,
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
@@ -84,7 +83,7 @@ object KafkaController {
class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
- val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
+ val controllerContext = new ControllerContext(zkClient)
private val partitionStateMachine = new PartitionStateMachine(this)
private val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 6016bd5..574922b 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,11 +17,10 @@
package kafka.server
import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, SystemTime, Logging}
+import kafka.utils.Logging
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
-import kafka.common.KafkaException
/**
* This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -47,62 +46,23 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
def elect: Boolean = {
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
- val timestamp = SystemTime.milliseconds.toString
- val electString =
- Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
- ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
-
- var electNotDone = true
- do {
- electNotDone = false
- try {
- createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
-
- info(brokerId + " successfully elected as leader")
- leaderId = brokerId
- onBecomingLeader()
- } catch {
- case e: ZkNodeExistsException =>
- readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
- // If someone else has written the path, then read the broker id
- case Some(controllerString) =>
- try {
- Json.parseFull(controllerString) match {
- case Some(m) =>
- val controllerInfo = m.asInstanceOf[Map[String, Any]]
- leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
- if (leaderId != brokerId) {
- info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- } else {
- info("I wrote this conflicted ephemeral node a while back in a different session, "
- + "hence I will retry")
- electNotDone = true
- Thread.sleep(controllerContext.zkSessionTimeout)
- }
- case None =>
- error("Error while reading leader info %s on broker %d".format(controllerString, brokerId))
- resign()
- }
- } catch {
- case t =>
- // It may be due to an incompatible controller register version
- info("Failed to parse the controller info as json. " +
- "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
- try {
- leaderId = controllerString.toInt
- info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- } catch {
- case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
- }
- }
- }
- // If the node disappear, retry immediately
- case e2 =>
- error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
- resign()
- }
- } while (electNotDone)
-
+ try {
+ createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
+ info(brokerId + " successfully elected as leader")
+ leaderId = brokerId
+ onBecomingLeader()
+ } catch {
+ case e: ZkNodeExistsException =>
+ // If someone else has written the path, then
+ val data: String = controllerContext.zkClient.readData(electionPath, true)
+ debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
+ if (data != null) {
+ leaderId = data.toInt
+ }
+ case e2 =>
+ error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+ resign()
+ }
amILeader
}
@@ -128,25 +88,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
controllerContext.controllerLock synchronized {
- try {
- Json.parseFull(data.toString) match {
- case Some(m) =>
- val controllerInfo = m.asInstanceOf[Map[String, Any]]
- leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
- info("New leader is %d".format(leaderId))
- case None =>
- error("Error while reading the leader info %s".format(data.toString))
- }
- } catch {
- case t =>
- // It may be due to an incompatible controller register version
- try {
- leaderId = data.toString.toInt
- info("New leader is %d".format(leaderId))
- } catch {
- case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
- }
- }
+ leaderId = data.toString.toInt
+ info("New leader is %d".format(leaderId))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 8440d94..0072a1a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -54,25 +54,7 @@ object ZkUtils extends Logging {
def getController(zkClient: ZkClient): Int= {
readDataMaybeNull(zkClient, ControllerPath)._1 match {
- case Some(controller) =>
- try {
- Json.parseFull(controller) match {
- case Some(m) =>
- val controllerInfo = m.asInstanceOf[Map[String, Any]]
- controllerInfo.get("brokerid").get.asInstanceOf[Int]
- case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
- }
- } catch {
- case t =>
- // It may be due to an incompatible controller register version
- info("Failed to parse the controller info as json. " +
- "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
- try {
- controller.toInt
- } catch {
- case t => throw new KafkaException("Failed to parse the controller info [%s] from zookeeper. This is neither the new or the old format.", t)
- }
- }
+ case Some(controller) => controller.toInt
case None => throw new KafkaException("Controller doesn't exist")
}
}
@@ -222,7 +204,7 @@ object ZkUtils extends Logging {
case Some(brokerZKString) => {
val broker = Broker.createBroker(id, brokerZKString)
if (broker.host == host && broker.port == port) {
- info("I wrote this conflicted ephemeral node a while back in a different session, "
+ info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString)
+ "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
Thread.sleep(timeout)
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 70e4b51..c4328f0 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val controllerId = 2
val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
- val controllerContext = new ControllerContext(zkClient, 6000)
+ val controllerContext = new ControllerContext(zkClient)
controllerContext.liveBrokers = brokers.toSet
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
controllerChannelManager.startup()