You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2013/01/21 19:48:03 UTC

git commit: KAFKA-705 Allow restarted replicas to follow a leader that is on a shutting down broker; reviewed by Neha Narkhede.

Updated Branches:
  refs/heads/0.8 5490884e9 -> f112ef2b5


KAFKA-705 Allow restarted replicas to follow a leader that is on a shutting down broker; reviewed by Neha Narkhede.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f112ef2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f112ef2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f112ef2b

Branch: refs/heads/0.8
Commit: f112ef2b5799cc8cf7786e98d1269145a19ef361
Parents: 5490884
Author: Joel Koshy <jj...@apache.org>
Authored: Mon Jan 21 10:47:31 2013 -0800
Committer: Joel Koshy <jj...@apache.org>
Committed: Mon Jan 21 10:47:31 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/ReplicaStateMachine.scala     |   16 +++++++-------
 1 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f112ef2b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 1d5bac4..264285c 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -140,14 +140,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               // check if the leader for this partition is alive or even exists
                 controllerContext.allLeaders.get(topicAndPartition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
-                  controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
-                    case true => // leader is alive
-                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                          topic, partition, leaderIsrAndControllerEpoch,
-                                                                          replicaAssignment.size)
-                      replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
-                    case false => // ignore partitions whose leader is not alive
+                  val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader
+                  if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) {
+                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
+                                                                        topic, partition, leaderIsrAndControllerEpoch,
+                                                                        replicaAssignment.size)
+                    replicaState.put((topic, partition, replicaId), OnlineReplica)
+                    info("Replica %d for partition [%s, %d] state changed to OnlineReplica"
+                         .format(replicaId, topic, partition))
                   }
                 case None => // ignore partitions who don't have a leader yet
               }