You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:11 UTC
[26/36] git commit: kafka-992; follow-up patch;
Double Check on Broker Registration to Avoid False NodeExist
Exception; patched by Guozhang Wang; reviewed by Neha Narkhede and Jun Rao
kafka-992; follow-up patch; Double Check on Broker Registration to Avoid False NodeExist Exception; patched by Guozhang Wang; reviewed by Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6849da05
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6849da05
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6849da05
Branch: refs/heads/trunk
Commit: 6849da0505da1f38edfbfb64e27cff504231142a
Parents: 7a9faa4
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Aug 16 21:51:35 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 16 21:51:35 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/cluster/Broker.scala | 2 -
.../consumer/ZookeeperConsumerConnector.scala | 24 +----
.../kafka/controller/KafkaController.scala | 25 ++++-
.../kafka/server/ZookeeperLeaderElector.scala | 89 +++++-----------
core/src/main/scala/kafka/utils/ZkUtils.scala | 101 ++++++++++---------
5 files changed, 103 insertions(+), 138 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 435c473..b03dea2 100644
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -58,8 +58,6 @@ private[kafka] case class Broker(val id: Int, val host: String, val port: Int) {
override def toString(): String = new String("id:" + id + ",host:" + host + ",port:" + port)
- def getZkString(): String = host + ":" + port
-
def getConnectionString(): String = host + ":" + port
def writeTo(buffer: ByteBuffer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c2b9b9a..e7a692a 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -220,28 +220,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
- while (true) {
- try {
- createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-
- info("end registering consumer " + consumerIdString + " in ZK")
- return
- } catch {
- case e: ZkNodeExistsException => {
- // An ephemeral node may still exist even after its corresponding session has expired
- // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
- // and hence the write succeeds without ZkNodeExistsException
- ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
- case Some(consumerZKString) => {
- info("I wrote this conflicted ephemeral node a while back in a different session, "
- + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
- Thread.sleep(config.zkSessionTimeoutMs)
- }
- case None => // the node disappeared; retry creating the ephemeral node immediately
- }
- }
- }
- }
+ createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null, (consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
+ info("end registering consumer " + consumerIdString + " in ZK")
}
private def sendShutdownToAllQueues() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 800f900..bde405a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -28,7 +28,7 @@ import kafka.common._
import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
import kafka.utils.ZkUtils._
-import kafka.utils.{Utils, ZkUtils, Logging}
+import kafka.utils.{Json, Utils, ZkUtils, Logging}
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
@@ -74,11 +74,32 @@ trait KafkaControllerMBean {
def shutdownBroker(id: Int): Set[TopicAndPartition]
}
-object KafkaController {
+object KafkaController extends Logging {
val MBeanName = "kafka.controller:type=KafkaController,name=ControllerOps"
val stateChangeLogger = "state.change.logger"
val InitialControllerEpoch = 1
val InitialControllerEpochZkVersion = 1
+
+ def parseControllerId(controllerInfoString: String): Int = {
+ try {
+ Json.parseFull(controllerInfoString) match {
+ case Some(m) =>
+ val controllerInfo = m.asInstanceOf[Map[String, Any]]
+ return controllerInfo.get("brokerid").get.asInstanceOf[Int]
+ case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
+ }
+ } catch {
+ case t =>
+ // It may be due to an incompatible controller register version
+ warn("Failed to parse the controller info as json. "
+ + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
+ try {
+ return controllerInfoString.toInt
+ } catch {
+ case t => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+ }
+ }
+ }
}
class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index d785db9..50e3f79 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,10 +17,11 @@
package kafka.server
import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, SystemTime, Logging}
+import kafka.utils.{Utils, SystemTime, Logging}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
+import kafka.controller.KafkaController
import kafka.common.KafkaException
/**
@@ -52,57 +53,29 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
- var electNotDone = true
- do {
- electNotDone = false
- try {
- createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
-
- info(brokerId + " successfully elected as leader")
- leaderId = brokerId
- onBecomingLeader()
+ try {
+ createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, leaderId,
+ (controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
+ controllerContext.zkSessionTimeout)
+ info(brokerId + " successfully elected as leader")
+ leaderId = brokerId
+ onBecomingLeader()
} catch {
case e: ZkNodeExistsException =>
- readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
- // If someone else has written the path, then read the broker id
- case Some(controllerString) =>
- try {
- Json.parseFull(controllerString) match {
- case Some(m) =>
- val controllerInfo = m.asInstanceOf[Map[String, Any]]
- leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
- if (leaderId != brokerId) {
- info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- } else {
- info("I wrote this conflicted ephemeral node a while back in a different session, "
- + "hence I will retry")
- electNotDone = true
- Thread.sleep(controllerContext.zkSessionTimeout)
- }
- case None =>
- warn("Error while reading leader info %s on broker %d, may be it is an old version".format(controllerString, brokerId))
- throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. May be it is an old version")
- }
- } catch {
- case t =>
- // It may be due to an incompatible controller register version
- info("Failed to parse the controller info as json. " +
- "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
- try {
- leaderId = controllerString.toInt
- info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
- } catch {
- case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
- }
- }
- case None =>
- // The node disappears, retry immediately
+ // If someone else has written the path, then
+ leaderId = readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
+ case Some(controller) => KafkaController.parseControllerId(controller)
+ case None => {
+ warn("A leader has been elected but just resigned, this will result in another round of election")
+ -1
+ }
}
+ if (leaderId != -1)
+ debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
case e2 =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
- resign()
- }
- } while (electNotDone)
+ leaderId = -1
+ }
amILeader
}
@@ -114,6 +87,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
def amILeader : Boolean = leaderId == brokerId
def resign() = {
+ leaderId = -1
deletePath(controllerContext.zkClient, electionPath)
}
@@ -129,25 +103,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
controllerContext.controllerLock synchronized {
- try {
- Json.parseFull(data.toString) match {
- case Some(m) =>
- val controllerInfo = m.asInstanceOf[Map[String, Any]]
- leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
- info("New leader is %d".format(leaderId))
- case None =>
- error("Error while reading the leader info %s".format(data.toString))
- }
- } catch {
- case t =>
- // It may be due to an incompatible controller register version
- try {
- leaderId = data.toString.toInt
- info("New leader is %d".format(leaderId))
- } catch {
- case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
- }
- }
+ leaderId = KafkaController.parseControllerId(data.toString)
+ info("New leader is %d".format(leaderId))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/6849da05/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 9772af8..ba5eacc 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -31,6 +31,7 @@ import kafka.admin._
import kafka.common.{KafkaException, NoEpochForPartitionException}
import kafka.controller.ReassignedPartitionsContext
import kafka.controller.PartitionAndReplica
+import kafka.controller.KafkaController
import scala.Some
import kafka.controller.LeaderIsrAndControllerEpoch
import kafka.common.TopicAndPartition
@@ -54,25 +55,7 @@ object ZkUtils extends Logging {
def getController(zkClient: ZkClient): Int= {
readDataMaybeNull(zkClient, ControllerPath)._1 match {
- case Some(controller) =>
- try {
- Json.parseFull(controller) match {
- case Some(m) =>
- val controllerInfo = m.asInstanceOf[Map[String, Any]]
- controllerInfo.get("brokerid").get.asInstanceOf[Int]
- case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
- }
- } catch {
- case t =>
- // It may be due to an incompatible controller register version
- info("Failed to parse the controller info as json. " +
- "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
- try {
- controller.toInt
- } catch {
- case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
- }
- }
+ case Some(controller) => KafkaController.parseControllerId(controller)
case None => throw new KafkaException("Controller doesn't exist")
}
}
@@ -206,36 +189,21 @@ object ZkUtils extends Logging {
Utils.mergeJsonFields(Utils.mapToJsonFields(Map("host" -> host), valueInQuotes = true) ++
Utils.mapToJsonFields(Map("version" -> 1.toString, "jmx_port" -> jmxPort.toString, "port" -> port.toString, "timestamp" -> timestamp),
valueInQuotes = false))
+ val expectedBroker = new Broker(id, host, port)
- while (true) {
- try {
- createEphemeralPathExpectConflict(zkClient, brokerIdPath, brokerInfo)
+ try {
+ createEphemeralPathExpectConflictHandleZKBug(zkClient, brokerIdPath, brokerInfo, expectedBroker,
+ (brokerString: String, broker: Any) => Broker.createBroker(broker.asInstanceOf[Broker].id, brokerString).equals(broker.asInstanceOf[Broker]),
+ timeout)
- info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
- return
- } catch {
- case e: ZkNodeExistsException => {
- // An ephemeral node may still exist even after its corresponding session has expired
- // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
- // and hence the write succeeds without ZkNodeExistsException
- ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + id)._1 match {
- case Some(brokerZKString) => {
- val broker = Broker.createBroker(id, brokerZKString)
- if (broker.host == host && broker.port == port) {
- info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString)
- + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
- Thread.sleep(timeout)
- } else {
- // otherwise, throw the runtime exception
- throw new RuntimeException("Another broker [%s:%s] other than the current broker [%s:%s] is already registered on the path %s."
- .format(broker.host, broker.port, host, port, brokerIdPath))
- }
- }
- case None => // the node disappeared; retry creating the ephemeral node immediately
- }
- }
- }
+ } catch {
+ case e: ZkNodeExistsException =>
+ throw new RuntimeException("A broker is already registered on the path " + brokerIdPath
+ + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or "
+ + "else you have shutdown this broker and restarted it faster than the zookeeper "
+ + "timeout so it appears to be re-registering.")
}
+ info("Registered broker %d at path %s with address %s:%d.".format(id, brokerIdPath, host, port))
}
def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
@@ -318,6 +286,47 @@ object ZkUtils extends Logging {
}
/**
+ * Create an ephemeral node with the given path and data.
+ * Throw NodeExistsException if node already exists.
+ * Handles the following ZK session timeout bug:
+ *
+ * https://issues.apache.org/jira/browse/ZOOKEEPER-1740
+ *
+ * Upon receiving a NodeExistsException, read the data from the conflicted path and
+ * trigger the checker function comparing the read data and the expected data,
+ * If the checker function returns true then the above bug might be encountered, back off and retry;
+ * otherwise re-throw the exception
+ */
+ def createEphemeralPathExpectConflictHandleZKBug(zkClient: ZkClient, path: String, data: String, expectedCallerData: Any, checker: (String, Any) => Boolean, backoffTime: Int): Unit = {
+ while (true) {
+ try {
+ createEphemeralPathExpectConflict(zkClient, path, data)
+ return
+ } catch {
+ case e: ZkNodeExistsException => {
+ // An ephemeral node may still exist even after its corresponding session has expired
+ // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
+ // and hence the write succeeds without ZkNodeExistsException
+ ZkUtils.readDataMaybeNull(zkClient, path)._1 match {
+ case Some(writtenData) => {
+ if (checker(writtenData, expectedCallerData)) {
+ info("I wrote this conflicted ephemeral node [%s] at %s a while back in a different session, ".format(data, path)
+ + "hence I will backoff for this node to be deleted by Zookeeper and retry")
+
+ Thread.sleep(backoffTime)
+ } else {
+ throw e
+ }
+ }
+ case None => // the node disappeared; retry creating the ephemeral node immediately
+ }
+ }
+ case e2 => throw e2
+ }
+ }
+ }
+
+ /**
* Create an persistent node with the given path and data. Create parents if necessary.
*/
def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {