You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/05/29 19:09:43 UTC
git commit: KAFKA-923 Improve controller failover latency. Remove
unnecessary zookeeper reads; reviewed by Jun Rao
Updated Branches:
refs/heads/0.8 e4f287db6 -> bbb161aa2
KAFKA-923 Improve controller failover latency. Remove unnecessary zookeeper reads; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbb161aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbb161aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbb161aa
Branch: refs/heads/0.8
Commit: bbb161aa250ce3071b03e01d9fa82091f23e1b30
Parents: e4f287d
Author: Neha Narkhede <ne...@apache.org>
Authored: Wed May 29 09:52:23 2013 -0700
Committer: Neha Narkhede <ne...@apache.org>
Committed: Wed May 29 10:05:48 2013 -0700
----------------------------------------------------------------------
.../scala/kafka/controller/KafkaController.scala | 2 +-
.../kafka/controller/PartitionStateMachine.scala | 8 +++-----
.../kafka/controller/ReplicaStateMachine.scala | 12 +++++++++++-
core/src/main/scala/kafka/utils/ZkUtils.scala | 14 ++++++--------
4 files changed, 21 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index f334685..a4e96cc 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -531,7 +531,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
private def updateLeaderAndIsrCache() {
- val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.allTopics.toSeq)
+ val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index e3af0c3..0f5ebde 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -189,13 +189,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
*/
private def initializePartitionState() {
for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
- val topic = topicPartition.topic
- val partition = topicPartition.partition
// check if leader and isr path exists for partition. If not, then it is in NEW state
- ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
- case Some(currentLeaderAndIsr) =>
+ controllerContext.partitionLeadershipInfo.get(topicPartition) match {
+ case Some(currentLeaderIsrAndEpoch) =>
// else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
- controllerContext.liveBrokerIds.contains(currentLeaderAndIsr.leader) match {
+ controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
case true => // leader is alive
partitionState.put(topicPartition, OnlinePartition)
case false =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e237805..5283fcd 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -59,7 +59,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
initializeReplicaState()
hasStarted.set(true)
// move all Online replicas to Online
- handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
+ handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq,
controllerContext.liveBrokerIds.toSeq), OnlineReplica)
info("Started replica state machine with initial state -> " + replicaState.toString())
}
@@ -229,6 +229,16 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
}
+ private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
+ brokerIds.map { brokerId =>
+ val partitionsAssignedToThisBroker =
+ controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId))
+ if(partitionsAssignedToThisBroker.size == 0)
+ info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
+ partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId))
+ }.flatten.toSet
+ }
+
def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb161aa/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 3775eb4..2f5dff6 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -461,15 +461,13 @@ object ZkUtils extends Logging {
cluster
}
- def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+ def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topicAndPartitions: Set[TopicAndPartition])
+ : mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
val ret = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
- val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
- for((topic, partitions) <- partitionsForTopics) {
- for(partition <- partitions) {
- ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition.toInt) match {
- case Some(leaderIsrAndControllerEpoch) => ret.put(TopicAndPartition(topic, partition.toInt), leaderIsrAndControllerEpoch)
- case None =>
- }
+ for(topicAndPartition <- topicAndPartitions) {
+ ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition) match {
+ case Some(leaderIsrAndControllerEpoch) => ret.put(topicAndPartition, leaderIsrAndControllerEpoch)
+ case None =>
}
}
ret