You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:11 UTC

[26/36] git commit: kafka-992; follow-up patch; Double Check on Broker Registration to Avoid False NodeExist Exception; patched by Guozhang Wang; reviewed by Neha Narkhede and Jun Rao

kafka-992; follow-up patch; Double Check on Broker Registration to Avoid False NodeExist Exception; patched by Guozhang Wang; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6849da05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6849da05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6849da05

Branch: refs/heads/trunk
Commit: 6849da0505da1f38edfbfb64e27cff504231142a
Parents: 7a9faa4
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Aug 16 21:51:35 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 16 21:51:35 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Broker.scala  |   2 -
 .../consumer/ZookeeperConsumerConnector.scala   |  24 +----
 .../kafka/controller/KafkaController.scala      |  25 ++++-
 .../kafka/server/ZookeeperLeaderElector.scala   |  89 +++++-----------
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 101 ++++++++++---------
 5 files changed, 103 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 435c473..b03dea2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -58,8 +58,6 @@ private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
   
   override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
 
-  def getZkString(): String = host + ":" + port
-
   def getConnectionString(): String = host + ":" + port
 
   def writeTo(buffer: ByteBuffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c2b9b9a..e7a692a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,28 +220,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
                              ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
 
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-
-        info("end registering consumer " + consumerIdString + " in ZK")
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
-            case Some(consumerZKString) => {
-              info("I wrote this conflicted ephemeral node a while back in a different session, "
-                + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
-              Thread.sleep(config.zkSessionTimeoutMs)
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-      }
-    }
+    createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
+    info("end registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShutdownToAllQueues() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 800f900..bde405a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,7 @@ import kafka.common._
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
-import kafka.utils.{Utils, ZkUtils, Logging}
+import kafka.utils.{Json, Utils, ZkUtils, Logging}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
@@ -74,11 +74,32 @@ trait KafkaControllerMBean {
   def shutdownBroker(id: Int): Set[TopicAndPartition]
 }
 
-object KafkaController {
+object KafkaController extends Logging {
   val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
   val stateChangeLogger = "state.change.logger"
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
+
+  def parseControllerId(controllerInfoString: String): Int = {
+    try {
+      Json.parseFull(controllerInfoString) match {
+        case Some(m) =>
+          val controllerInfo = m.asInstanceOf[Map[String, Any]]
+          return controllerInfo.get("brokerid").get.asInstanceOf[Int]
+        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
+      }
+    } catch {
+      case t =>
+        // It may be due to an incompatible controller register version
+        warn("Failed to parse the controller info as json. "
+          + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
+        try {
+          return controllerInfoString.toInt
+        } catch {
+          case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+        }
+    }
+  }
 }
 
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index d785db9..50e3f79 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,10 +17,11 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, SystemTime, Logging}
+import kafka.utils.{Utils, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
+import kafka.controller.KafkaController
 import kafka.common.KafkaException
 
 /**
@@ -52,57 +53,29 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
         ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
 
-    var electNotDone = true
-    do {
-      electNotDone = false
-      try {
-        createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
-
-        info(brokerId + " successfully elected as leader")
-        leaderId = brokerId
-        onBecomingLeader()
+    try {
+      createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
+        (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
+        controllerContext.zkSessionTimeout)
+      info(brokerId + " successfully elected as leader")
+      leaderId = brokerId
+      onBecomingLeader()
       } catch {
         case e: ZkNodeExistsException =>
-          readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
-          // If someone else has written the path, then read the broker id
-            case Some(controllerString) =>
-              try {
-                Json.parseFull(controllerString) match {
-                  case Some(m) =>
-                    val controllerInfo = m.asInstanceOf[Map[String, Any]]
-                    leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
-                    if (leaderId != brokerId) {
-                      info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-                    } else {
-                      info("I wrote this conflicted ephemeral node a while back in a different session, "
-                        + "hence I will retry")
-                      electNotDone = true
-                      Thread.sleep(controllerContext.zkSessionTimeout)
-                    }
-                  case None =>
-                    warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId))
-                    throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version")
-                }
-              } catch {
-                case t =>
-                  // It may be due to an incompatible controller register version
-                  info("Failed to parse the controller info as json. " +
-                    "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
-                  try {
-                    leaderId = controllerString.toInt
-                    info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-                  } catch {
-                    case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
-                  }
-              }
-            case None =>
-              // The node disappears, retry immediately
+          // If someone else has written the path, then
+          leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+            case Some(controller) => KafkaController.parseControllerId(controller)
+            case None => {
+              warn("A leader has been elected but just resigned, this will result in another round of election")
+              -1
+            }
           }
+          if (leaderId != -1)
+            debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
         case e2 =>
           error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-          resign()
-      }
-    } while (electNotDone)
+          leaderId = -1
+    }
 
     amILeader
   }
@@ -114,6 +87,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
   def amILeader : Boolean = leaderId == brokerId
 
   def resign() = {
+    leaderId = -1
     deletePath(controllerContext.zkClient, electionPath)
   }
 
@@ -129,25 +103,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        try {
-          Json.parseFull(data.toString) match {
-            case Some(m) =>
-              val controllerInfo = m.asInstanceOf[Map[String, Any]]
-              leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
-              info("New leader is %d".format(leaderId))
-            case None =>
-              error("Error while reading the leader info %s".format(data.toString))
-          }
-        } catch {
-          case t =>
-            // It may be due to an incompatible controller register version
-            try {
-              leaderId = data.toString.toInt
-              info("New leader is %d".format(leaderId))
-            } catch {
-              case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
-            }
-        }
+        leaderId = KafkaController.parseControllerId(data.toString)
+        info("New leader is %d".format(leaderId))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9772af8..ba5eacc 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -31,6 +31,7 @@ import kafka.admin._
 import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.ReassignedPartitionsContext
 import kafka.controller.PartitionAndReplica
+import kafka.controller.KafkaController
 import scala.Some
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.common.TopicAndPartition
@@ -54,25 +55,7 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) =>
-        try {
-          Json.parseFull(controller) match {
-            case Some(m) =>
-              val controllerInfo = m.asInstanceOf[Map[String, Any]]
-              controllerInfo.get("brokerid").get.asInstanceOf[Int]
-            case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
-          }
-        } catch {
-          case t =>
-            // It may be due to an incompatible controller register version
-            info("Failed to parse the controller info as json. " +
-              "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
-            try {
-              controller.toInt
-            } catch {
-              case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
-            }
-        }
+      case Some(controller) => KafkaController.parseControllerId(controller)
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
@@ -206,36 +189,21 @@ object ZkUtils extends Logging {
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
                              Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
                                                    valueInQuotes = false))
+    val expectedBroker = new Broker(id, host, port)
 
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+    try {
+      createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
+        (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]),
+        timeout)
 
-        info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match {
-            case Some(brokerZKString) => {
-              val broker = Broker.createBroker(id, brokerZKString)
-              if (broker.host == host && broker.port == port) {
-                info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString)
-                  + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
-                Thread.sleep(timeout)
-              } else {
-                // otherwise, throw the runtime exception
-                throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s."
-                  .format(broker.host, broker.port, host, port, brokerIdPath))
-              }
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-      }
+    } catch {
+      case e: ZkNodeExistsException =>
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+          + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
+          + "else you have shutdown this broker and restarted it faster than the zookeeper "
+          + "timeout so it appears to be re-registering.")
     }
+    info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
   }
 
   def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
@@ -318,6 +286,47 @@ object ZkUtils extends Logging {
   }
 
   /**
+   * Create an ephemeral node with the given path and data.
+   * Throw NodeExistsException if node already exists.
+   * Handles the following ZK session timeout bug:
+   *
+   * https://issues.apache.org/jira/browse/ZOOKEEPER-1740
+   *
+   * Upon receiving a NodeExistsException, read the data from the conflicted path and
+   * trigger the checker function comparing the read data and the expected data,
+   * If the checker function returns true then the above bug might be encountered, back off and retry;
+   * otherwise re-throw the exception
+   */
+  def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
+    while (true) {
+      try {
+        createEphemeralPathExpectConflict(zkClient, path, data)
+        return
+      } catch {
+        case e: ZkNodeExistsException => {
+          // An ephemeral node may still exist even after its corresponding session has expired
+          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+          // and hence the write succeeds without ZkNodeExistsException
+          ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
+            case Some(writtenData) => {
+              if (checker(writtenData, expectedCallerData)) {
+                info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path)
+                  + "hence I will backoff for this node to be deleted by Zookeeper and retry")
+
+                Thread.sleep(backoffTime)
+              } else {
+                throw e
+              }
+            }
+            case None => // the node disappeared; retry creating the ephemeral node immediately
+          }
+        }
+        case e2 => throw e2
+      }
+    }
+  }
+
+  /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
   def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {