You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:14 UTC
[29/36] git commit: KAFKA-1029 Zookeeper leader election stuck in
ephemeral node retry loop; reviewed by Neha and Guozhang
KAFKA-1029 Zookeeper leader election stuck in ephemeral node retry loop; reviewed by Neha and Guozhang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7640bee3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7640bee3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7640bee3
Branch: refs/heads/trunk
Commit: 7640bee3d281e0658f97997738a1f1f599fd4c07
Parents: d217f4c
Author: Sam Meder <sa...@gmail.com>
Authored: Tue Aug 27 10:16:33 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Aug 27 10:16:56 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7640bee3/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 50e3f79..f1f0625 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -42,19 +42,19 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
def startup {
controllerContext.controllerLock synchronized {
+ controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
elect
}
}
def elect: Boolean = {
- controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
val timestamp = SystemTime.milliseconds.toString
val electString =
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
try {
- createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
+ createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
(controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
controllerContext.zkSessionTimeout)
info(brokerId + " successfully elected as leader")