You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:09 UTC

[24/36] git commit: KAFKA-992 followup: Fix zookeeper de-registration bug for controller and consumer; reviewed by Neha Narkhede

KAFKA-992 followup: Fix zookeeper de-registration bug for controller and consumer; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1db824ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1db824ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1db824ed

Branch: refs/heads/trunk
Commit: 1db824ed2fcdaa45c3b1d0dcbf9101299fded09c
Parents: 1d75e09
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Aug 9 15:44:00 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Aug 9 15:44:11 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 30 +++++-
 .../kafka/controller/KafkaController.scala      |  3 +-
 .../kafka/server/ZookeeperLeaderElector.scala   | 98 ++++++++++++++++----
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 20 +++-
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 5 files changed, 126 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 0ca2850..17977e7 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,13 +213,35 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // this API is used by unit tests only
   def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
     info("begin registering consumer " + consumerIdString + " in ZK")
+    val timestamp = SystemTime.milliseconds.toString
     val consumerRegistrationInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
-                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
-    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-    info("end registering consumer " + consumerIdString + " in ZK")
+                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
+
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
+
+        info("end registering consumer " + consumerIdString + " in ZK")
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
+            case Some(consumerZKString) => {
+              info("I wrote this conflicted ephemeral node a while back in a different session, "
+                + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
+              Thread.sleep(config.zkSessionTimeoutMs)
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+      }
+    }
   }
 
   private def sendShutdownToAllQueues() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index c87caab..800f900 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,6 +37,7 @@ import scala.Some
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
+                        val zkSessionTimeout: Int,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
@@ -83,7 +84,7 @@ object KafkaController {
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
-  val controllerContext = new ControllerContext(zkClient)
+  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 574922b..d785db9 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,10 +17,11 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.Logging
+import kafka.utils.{Json, Utils, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
+import kafka.common.KafkaException
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -46,23 +47,63 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
 
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
-    try {
-      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
-      info(brokerId + " successfully elected as leader")
-      leaderId = brokerId
-      onBecomingLeader()
-    } catch {
-      case e: ZkNodeExistsException =>
-        // If someone else has written the path, then
-        val data: String = controllerContext.zkClient.readData(electionPath, true)
-        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
-        if (data != null) {
-          leaderId = data.toInt
-        }
-      case e2 =>
-        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-        resign()
-    }
+    val timestamp = SystemTime.milliseconds.toString
+    val electString =
+      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
+        ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
+
+    var electNotDone = true
+    do {
+      electNotDone = false
+      try {
+        createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
+
+        info(brokerId + " successfully elected as leader")
+        leaderId = brokerId
+        onBecomingLeader()
+      } catch {
+        case e: ZkNodeExistsException =>
+          readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+          // If someone else has written the path, then read the broker id
+            case Some(controllerString) =>
+              try {
+                Json.parseFull(controllerString) match {
+                  case Some(m) =>
+                    val controllerInfo = m.asInstanceOf[Map[String, Any]]
+                    leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
+                    if (leaderId != brokerId) {
+                      info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+                    } else {
+                      info("I wrote this conflicted ephemeral node a while back in a different session, "
+                        + "hence I will retry")
+                      electNotDone = true
+                      Thread.sleep(controllerContext.zkSessionTimeout)
+                    }
+                  case None =>
+                    warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId))
+                    throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version")
+                }
+              } catch {
+                case t =>
+                  // It may be due to an incompatible controller register version
+                  info("Failed to parse the controller info as json. " +
+                    "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
+                  try {
+                    leaderId = controllerString.toInt
+                    info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+                  } catch {
+                    case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
+                  }
+              }
+            case None =>
+              // The node disappears, retry immediately
+          }
+        case e2 =>
+          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+          resign()
+      }
+    } while (electNotDone)
+
     amILeader
   }
 
@@ -88,8 +129,25 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        leaderId = data.toString.toInt
-        info("New leader is %d".format(leaderId))
+        try {
+          Json.parseFull(data.toString) match {
+            case Some(m) =>
+              val controllerInfo = m.asInstanceOf[Map[String, Any]]
+              leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
+              info("New leader is %d".format(leaderId))
+            case None =>
+              error("Error while reading the leader info %s".format(data.toString))
+          }
+        } catch {
+          case t =>
+            // It may be due to an incompatible controller register version
+            try {
+              leaderId = data.toString.toInt
+              info("New leader is %d".format(leaderId))
+            } catch {
+              case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
+            }
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0072a1a..9772af8 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -54,7 +54,25 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) => controller.toInt
+      case Some(controller) =>
+        try {
+          Json.parseFull(controller) match {
+            case Some(m) =>
+              val controllerInfo = m.asInstanceOf[Map[String, Any]]
+              controllerInfo.get("brokerid").get.asInstanceOf[Int]
+            case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
+          }
+        } catch {
+          case t =>
+            // It may be due to an incompatible controller register version
+            info("Failed to parse the controller info as json. " +
+              "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
+            try {
+              controller.toInt
+            } catch {
+              case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
+            }
+        }
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1db824ed/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index c4328f0..70e4b51 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val controllerId = 2
     val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
-    val controllerContext = new ControllerContext(zkClient)
+    val controllerContext = new ControllerContext(zkClient, 6000)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
     controllerChannelManager.startup()