You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:29 UTC
[32/37] git commit: kafka-1451;
Broker stuck due to leader election race; patched by Manikumar Reddy;
reviewed by Jun Rao
kafka-1451; Broker stuck due to leader election race; patched by Manikumar Reddy; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a01a101e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a01a101e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a01a101e
Branch: refs/heads/transactional_messaging
Commit: a01a101e82d5b06e89857e79c4b8268589d81fca
Parents: 50f2b24
Author: Manikumar Reddy <ma...@gmail.com>
Authored: Wed Jul 30 08:14:41 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 30 08:14:41 2014 -0700
----------------------------------------------------------------------
.../kafka/server/ZookeeperLeaderElector.scala | 30 +++++++++++++++-----
1 file changed, 23 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a01a101e/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index e5b6ff1..a75818a 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -50,9 +50,27 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
}
}
+ private def getControllerID(): Int = {
+ readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+ case Some(controller) => KafkaController.parseControllerId(controller)
+ case None => -1
+ }
+ }
+
def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
+
+ leaderId = getControllerID
+ /*
+ * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
+ * it's possible that the controller has already been elected when we get here. This check will prevent the following
+ * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
+ */
+ if(leaderId != -1) {
+ debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
+ return amILeader
+ }
try {
createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
@@ -64,15 +82,13 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
} catch {
case e: ZkNodeExistsException =>
// If someone else has written the path, then
- leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
- case Some(controller) => KafkaController.parseControllerId(controller)
- case None => {
- warn("A leader has been elected but just resigned, this will result in another round of election")
- -1
- }
- }
+ leaderId = getControllerID
+
if (leaderId != -1)
debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
+ else
+ warn("A leader has been elected but just resigned, this will result in another round of election")
+
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
resign()