You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:09 UTC
[24/36] git commit: KAFKA-992 followup: Fix zookeeper de-registration
bug for controller and consumer; reviewed by Neha Narkhede
KAFKA-992 followup: Fix zookeeper de-registration bug for controller and consumer; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1db824ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1db824ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1db824ed
Branch: refs/heads/trunk
Commit: 1db824ed2fcdaa45c3b1d0dcbf9101299fded09c
Parents: 1d75e09
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Aug 9 15:44:00 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Aug 9 15:44:11 2013 -0700
----------------------------------------------------------------------
.../consumer/ZookeeperConsumerConnector.scala | 30 +++++-
.../kafka/controller/KafkaController.scala | 3 +-
.../kafka/server/ZookeeperLeaderElector.scala | 98 ++++++++++++++++----
core/src/main/scala/kafka/utils/ZkUtils.scala | 20 +++-
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
5 files changed, 126 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0ca2850..17977e7 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,13 +213,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
// this API is used by unit tests only
def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
- private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+ private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
info("begin registering consumer " + consumerIdString + " in ZK")
+ val timestamp = SystemTime.milliseconds.toString
val consumerRegistrationInfo =
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
- ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
- createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
- info("end registering consumer " + consumerIdString + " in ZK")
+ ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
+
+ while (true) {
+ try {
+ createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
+
+ info("end registering consumer " + consumerIdString + " in ZK")
+ return
+ } catch {
+ case e: ZkNodeExistsException => {
+ // An ephemeral node may still exist even after its corresponding session has expired
+ // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+ // and hence the write succeeds without ZkNodeExistsException
+ ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
+ case Some(consumerZKString) => {
+ info("I wrote this conflicted ephemeral node a while back in a different session, "
+ + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
+ Thread.sleep(config.zkSessionTimeoutMs)
+ }
+ case None => // the node disappeared; retry creating the ephemeral node immediately
+ }
+ }
+ }
+ }
}
private def sendShutdownToAllQueues() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index c87caab..800f900 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,6 +37,7 @@ import scala.Some
import kafka.common.TopicAndPartition
class ControllerContext(val zkClient: ZkClient,
+ val zkSessionTimeout: Int,
var controllerChannelManager: ControllerChannelManager = null,
val controllerLock: Object = new Object,
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
@@ -83,7 +84,7 @@ object KafkaController {
class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
- val controllerContext = new ControllerContext(zkClient)
+ val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
private val partitionStateMachine = new PartitionStateMachine(this)
private val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 574922b..d785db9 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,10 +17,11 @@
package kafka.server
import kafka.utils.ZkUtils._
-import kafka.utils.Logging
+import kafka.utils.{Json, Utils, SystemTime, Logging}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
+import kafka.common.KafkaException
/**
* This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -46,23 +47,63 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
def elect: Boolean = {
controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
- try {
- createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
- info(brokerId + " successfully elected as leader")
- leaderId = brokerId
- onBecomingLeader()
- } catch {
- case e: ZkNodeExistsException =>
- // If someone else has written the path, then
- val data: String = controllerContext.zkClient.readData(electionPath, true)
- debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
- if (data != null) {
- leaderId = data.toInt
- }
- case e2 =>
- error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
- resign()
- }
+ val timestamp = SystemTime.milliseconds.toString
+ val electString =
+ Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
+ ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
+
+ var electNotDone = true
+ do {
+ electNotDone = false
+ try {
+ createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
+
+ info(brokerId + " successfully elected as leader")
+ leaderId = brokerId
+ onBecomingLeader()
+ } catch {
+ case e: ZkNodeExistsException =>
+ readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+ // If someone else has written the path, then read the broker id
+ case Some(controllerString) =>
+ try {
+ Json.parseFull(controllerString) match {
+ case Some(m) =>
+ val controllerInfo = m.asInstanceOf[Map[String, Any]]
+ leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
+ if (leaderId != brokerId) {
+ info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+ } else {
+ info("I wrote this conflicted ephemeral node a while back in a different session, "
+ + "hence I will retry")
+ electNotDone = true
+ Thread.sleep(controllerContext.zkSessionTimeout)
+ }
+ case None =>
+ warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId))
+ throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version")
+ }
+ } catch {
+ case t =>
+ // It may be due to an incompatible controller register version
+ info("Failed to parse the controller info as json. " +
+ "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
+ try {
+ leaderId = controllerString.toInt
+ info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+ } catch {
+ case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
+ }
+ }
+ case None =>
+ // The node disappears, retry immediately
+ }
+ case e2 =>
+ error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+ resign()
+ }
+ } while (electNotDone)
+
amILeader
}
@@ -88,8 +129,25 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
controllerContext.controllerLock synchronized {
- leaderId = data.toString.toInt
- info("New leader is %d".format(leaderId))
+ try {
+ Json.parseFull(data.toString) match {
+ case Some(m) =>
+ val controllerInfo = m.asInstanceOf[Map[String, Any]]
+ leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
+ info("New leader is %d".format(leaderId))
+ case None =>
+ error("Error while reading the leader info %s".format(data.toString))
+ }
+ } catch {
+ case t =>
+ // It may be due to an incompatible controller register version
+ try {
+ leaderId = data.toString.toInt
+ info("New leader is %d".format(leaderId))
+ } catch {
+ case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0072a1a..9772af8 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -54,7 +54,25 @@ object ZkUtils extends Logging {
def getController(zkClient: ZkClient): Int= {
readDataMaybeNull(zkClient, ControllerPath)._1 match {
- case Some(controller) => controller.toInt
+ case Some(controller) =>
+ try {
+ Json.parseFull(controller) match {
+ case Some(m) =>
+ val controllerInfo = m.asInstanceOf[Map[String, Any]]
+ controllerInfo.get("brokerid").get.asInstanceOf[Int]
+ case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
+ }
+ } catch {
+ case t =>
+ // It may be due to an incompatible controller register version
+ info("Failed to parse the controller info as json. " +
+ "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
+ try {
+ controller.toInt
+ } catch {
+ case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
+ }
+ }
case None => throw new KafkaException("Controller doesn't exist")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c4328f0..70e4b51 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
val controllerId = 2
val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
- val controllerContext = new ControllerContext(zkClient)
+ val controllerContext = new ControllerContext(zkClient, 6000)
controllerContext.liveBrokers = brokers.toSet
val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
controllerChannelManager.startup()