You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/09/11 19:04:08 UTC

[23/36] git commit: Revert "KAFKA-992 follow up: Fix the zookeeper de-registration issue for controller and consumer; reviewed by Neha Narkhede"

Revert "KAFKA-992 follow up: Fix the zookeeper de-registration issue for controller and consumer; reviewed by Neha Narkhede"

This reverts commit 81c49bbdae5e490f9d5dc7b042ee60e617fbb22b.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d75e093
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d75e093
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d75e093

Branch: refs/heads/trunk
Commit: 1d75e09313b5b3f1cde6c39fdda20bc7185cfdf6
Parents: 81c49bb
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Aug 9 15:29:08 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Aug 9 15:29:08 2013 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 30 +-----
 .../kafka/controller/KafkaController.scala      |  3 +-
 .../kafka/server/ZookeeperLeaderElector.scala   | 97 ++++----------------
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 22 +----
 .../unit/kafka/server/LeaderElectionTest.scala  |  2 +-
 5 files changed, 28 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 17977e7..0ca2850 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -213,35 +213,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // this API is used by unit tests only
   def getTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]] = topicRegistry
 
-  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) {
+  private def registerConsumerInZK(dirs: ZKGroupDirs, consumerIdString: String, topicCount: TopicCount) = {
     info("begin registering consumer " + consumerIdString + " in ZK")
-    val timestamp = SystemTime.milliseconds.toString
     val consumerRegistrationInfo =
       Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "subscription" -> topicCount.dbString), valueInQuotes = false)
-                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern, "timestamp" -> timestamp), valueInQuotes = true))
-
-    while (true) {
-      try {
-        createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
-
-        info("end registering consumer " + consumerIdString + " in ZK")
-        return
-      } catch {
-        case e: ZkNodeExistsException => {
-          // An ephemeral node may still exist even after its corresponding session has expired
-          // due to a Zookeeper bug, in this case we need to retry writing until the previous node is deleted
-          // and hence the write succeeds without ZkNodeExistsException
-          ZkUtils.readDataMaybeNull(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString)._1 match {
-            case Some(consumerZKString) => {
-              info("I wrote this conflicted ephemeral node a while back in a different session, "
-                + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
-              Thread.sleep(config.zkSessionTimeoutMs)
-            }
-            case None => // the node disappeared; retry creating the ephemeral node immediately
-          }
-        }
-      }
-    }
+                             ++ Utils.mapToJsonFields(Map("pattern" -> topicCount.pattern), valueInQuotes = true))
+    createEphemeralPathExpectConflict(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo)
+    info("end registering consumer " + consumerIdString + " in ZK")
   }
 
   private def sendShutdownToAllQueues() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 800f900..c87caab 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -37,7 +37,6 @@ import scala.Some
 import kafka.common.TopicAndPartition
 
 class ControllerContext(val zkClient: ZkClient,
-                        val zkSessionTimeout: Int,
                         var controllerChannelManager: ControllerChannelManager = null,
                         val controllerLock: Object = new Object,
                         var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
@@ -84,7 +83,7 @@ object KafkaController {
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
-  val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
+  val controllerContext = new ControllerContext(zkClient)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index 6016bd5..574922b 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,11 +17,10 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, SystemTime, Logging}
+import kafka.utils.Logging
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
-import kafka.common.KafkaException
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -47,62 +46,23 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
 
   def elect: Boolean = {
     controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
-    val timestamp = SystemTime.milliseconds.toString
-    val electString =
-      Utils.mergeJsonFields(Utils.mapToJsonFields(Map("version" -> 1.toString, "brokerid" -> brokerId.toString), valueInQuotes = false)
-        ++ Utils.mapToJsonFields(Map("timestamp" -> timestamp), valueInQuotes = true))
-
-    var electNotDone = true
-    do {
-      electNotDone = false
-      try {
-        createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, electString)
-
-        info(brokerId + " successfully elected as leader")
-        leaderId = brokerId
-        onBecomingLeader()
-      } catch {
-        case e: ZkNodeExistsException =>
-          readDataMaybeNull(controllerContext.zkClient, electionPath)._1 match {
-          // If someone else has written the path, then read the broker id
-            case Some(controllerString) =>
-              try {
-                Json.parseFull(controllerString) match {
-                  case Some(m) =>
-                    val controllerInfo = m.asInstanceOf[Map[String, Any]]
-                    leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
-                    if (leaderId != brokerId) {
-                      info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-                    } else {
-                      info("I wrote this conflicted ephemeral node a while back in a different session, "
-                        + "hence I will retry")
-                      electNotDone = true
-                      Thread.sleep(controllerContext.zkSessionTimeout)
-                    }
-                  case None =>
-                    error("Error while reading leader info %s on broker %d".format(controllerString, brokerId))
-                    resign()
-                }
-              } catch {
-                case t =>
-                  // It may be due to an incompatible controller register version
-                  info("Failed to parse the controller info as json. " +
-                    "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controllerString))
-                  try {
-                    leaderId = controllerString.toInt
-                    info("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId))
-                  } catch {
-                    case t => throw new KafkaException("Failed to parse the leader info [%s] from zookeeper. This is neither the new or the old format.", t)
-                  }
-              }
-          }
-          // If the node disappear, retry immediately
-        case e2 =>
-          error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
-          resign()
-      }
-    } while (electNotDone)
-
+    try {
+      createEphemeralPathExpectConflict(controllerContext.zkClient, electionPath, brokerId.toString)
+      info(brokerId + " successfully elected as leader")
+      leaderId = brokerId
+      onBecomingLeader()
+    } catch {
+      case e: ZkNodeExistsException =>
+        // If someone else has written the path, then
+        val data: String = controllerContext.zkClient.readData(electionPath, true)
+        debug("Broker %d was elected as leader instead of broker %d".format(data.toInt, brokerId))
+        if (data != null) {
+          leaderId = data.toInt
+        }
+      case e2 =>
+        error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
+        resign()
+    }
     amILeader
   }
 
@@ -128,25 +88,8 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath:
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       controllerContext.controllerLock synchronized {
-        try {
-          Json.parseFull(data.toString) match {
-            case Some(m) =>
-              val controllerInfo = m.asInstanceOf[Map[String, Any]]
-              leaderId = controllerInfo.get("brokerid").get.asInstanceOf[Int]
-              info("New leader is %d".format(leaderId))
-            case None =>
-              error("Error while reading the leader info %s".format(data.toString))
-          }
-        } catch {
-          case t =>
-            // It may be due to an incompatible controller register version
-            try {
-              leaderId = data.toString.toInt
-              info("New leader is %d".format(leaderId))
-            } catch {
-              case t => throw new KafkaException("Failed to parse the leader info from zookeeper: " + data, t)
-            }
-        }
+        leaderId = data.toString.toInt
+        info("New leader is %d".format(leaderId))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 8440d94..0072a1a 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -54,25 +54,7 @@ object ZkUtils extends Logging {
 
   def getController(zkClient: ZkClient): Int= {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
-      case Some(controller) =>
-        try {
-          Json.parseFull(controller) match {
-            case Some(m) =>
-              val controllerInfo = m.asInstanceOf[Map[String, Any]]
-              controllerInfo.get("brokerid").get.asInstanceOf[Int]
-            case None => throw new KafkaException("Failed to parse the controller info json [%s] from zookeeper.".format(controller))
-          }
-        } catch {
-          case t =>
-            // It may be due to an incompatible controller register version
-            info("Failed to parse the controller info as json. " +
-              "Probably this controller is still using the old format [%s] of storing the broker id in the zookeeper path".format(controller))
-            try {
-              controller.toInt
-            } catch {
-              case t => throw new KafkaException("Failed to parse the controller info [%s] from zookeeper. This is neither the new or the old format.", t)
-            }
-        }
+      case Some(controller) => controller.toInt
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
@@ -222,7 +204,7 @@ object ZkUtils extends Logging {
             case Some(brokerZKString) => {
               val broker = Broker.createBroker(id, brokerZKString)
               if (broker.host == host && broker.port == port) {
-                info("I wrote this conflicted ephemeral node a while back in a different session, "
+                info("I wrote this conflicted ephemeral node [%s] a while back in a different session, ".format(brokerZKString)
                   + "hence I will backoff for this node to be deleted by Zookeeper after session timeout and retry")
                 Thread.sleep(timeout)
               } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1d75e093/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 70e4b51..c4328f0 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -124,7 +124,7 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     val controllerId = 2
     val controllerConfig = new KafkaConfig(TestUtils.createBrokerConfig(controllerId, TestUtils.choosePort()))
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.config.port))
-    val controllerContext = new ControllerContext(zkClient, 6000)
+    val controllerContext = new ControllerContext(zkClient)
     controllerContext.liveBrokers = brokers.toSet
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig)
     controllerChannelManager.startup()