You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/05/29 19:09:43 UTC

git commit: KAFKA-923 Improve controller failover latency. Remove unnecessary zookeeper reads; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 e4f287db6 -> bbb161aa2


KAFKA-923 Improve controller failover latency. Remove unnecessary zookeeper reads; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbb161aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbb161aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbb161aa

Branch: refs/heads/0.8
Commit: bbb161aa250ce3071b03e01d9fa82091f23e1b30
Parents: e4f287d
Author: Neha Narkhede <ne...@apache.org>
Authored: Wed May 29 09:52:23 2013 -0700
Committer: Neha Narkhede <ne...@apache.org>
Committed: Wed May 29 10:05:48 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/controller/KafkaController.scala   |    2 +-
 .../kafka/controller/PartitionStateMachine.scala   |    8 +++-----
 .../kafka/controller/ReplicaStateMachine.scala     |   12 +++++++++++-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   14 ++++++--------
 4 files changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f334685..a4e96cc 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -531,7 +531,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def updateLeaderAndIsrCache() {
-    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
+    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet)
     for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
       controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e3af0c3..0f5ebde 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -189,13 +189,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    */
   private def initializePartitionState() {
     for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
-      val topic = topicPartition.topic
-      val partition = topicPartition.partition
       // check if leader and isr path exists for partition. If not, then it is in NEW state
-      ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
-        case Some(currentLeaderAndIsr) =>
+      controllerContext.partitionLeadershipInfo.get(topicPartition) match {
+        case Some(currentLeaderIsrAndEpoch) =>
           // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
-          controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match {
+          controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
             case true => // leader is alive
               partitionState.put(topicPartition, OnlinePartition)
             case false =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e237805..5283fcd 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -59,7 +59,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     initializeReplicaState()
     hasStarted.set(true)
     // move all Online replicas to Online
-    handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
+    handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq,
       controllerContext.liveBrokerIds.toSeq), OnlineReplica)
     info("Started replica state machine with initial state -> " + replicaState.toString())
   }
@@ -229,6 +229,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
+  private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
+    brokerIds.map { brokerId =>
+      val partitionsAssignedToThisBroker =
+        controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId))
+      if(partitionsAssignedToThisBroker.size == 0)
+        info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+      partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId))
+    }.flatten.toSet
+  }
+
   def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
     controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 3775eb4..2f5dff6 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -461,15 +461,13 @@ object ZkUtils extends Logging {
     cluster
   }
 
-  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+  def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
+  : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
     val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
-    val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
-    for((topic, partitions) <- partitionsForTopics) {
-      for(partition <- partitions) {
-        ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition.toInt) match {
-          case Some(leaderIsrAndControllerEpoch) => ret.put(TopicAndPartition(topic, partition.toInt), leaderIsrAndControllerEpoch)
-          case None =>
-        }
+    for(topicAndPartition <- topicAndPartitions) {
+      ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match {
+        case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
+        case None =>
       }
     }
     ret