You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:29 UTC

[32/37] git commit: kafka-1451; Broker stuck due to leader election race; patched by Manikumar Reddy; reviewed by Jun Rao

kafka-1451; Broker stuck due to leader election race; patched by Manikumar Reddy; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a01a101e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a01a101e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a01a101e

Branch: refs/heads/transactional_messaging
Commit: a01a101e82d5b06e89857e79c4b8268589d81fca
Parents: 50f2b24
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Jul 30 08:14:41 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 30 08:14:41 2014 -0700

----------------------------------------------------------------------
 .../kafka/server/ZookeeperLeaderElector.scala   | 30 +++++++++++++++-----
 1 file changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a01a101e/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index e5b6ff1..a75818a 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -50,9 +50,27 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     }
   }
 
+  private def getControllerID(): Int = {
+    readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+       case Some(controller) => KafkaController.parseControllerId(controller)
+       case None => -1
+    }
+  }
+    
   def elect: Boolean = {
     val timestamp = SystemTime.milliseconds.toString
     val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
+   
+   leaderId = getControllerID 
+    /* 
+     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 
+     * it's possible that the controller has already been elected when we get here. This check will prevent the following 
+     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
+     */
+    if(leaderId != -1) {
+       debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
+       return amILeader
+    }
 
     try {
       createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
@@ -64,15 +82,13 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
     } catch {
       case e: ZkNodeExistsException =>
         // If someone else has written the path, then
-        leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
-          case Some(controller) => KafkaController.parseControllerId(controller)
-          case None => {
-            warn("A leader has been elected but just resigned, this will result in another round of election")
-            -1
-          }
-        }
+        leaderId = getControllerID 
+
         if (leaderId != -1)
           debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+        else
+          warn("A leader has been elected but just resigned, this will result in another round of election")
+
       case e2: Throwable =>
         error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
         resign()