You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[7/28] git commit: KAFKA-705 Allow restarted replicas to follow a
leader that is on a shutting down broker; reviewed by Neha Narkhede.
KAFKA-705 Allow restarted replicas to follow a leader that is on a shutting down broker; reviewed by Neha Narkhede.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f112ef2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f112ef2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f112ef2b
Branch: refs/heads/trunk
Commit: f112ef2b5799cc8cf7786e98d1269145a19ef361
Parents: 5490884
Author: Joel Koshy <jj...@apache.org>
Authored: Mon Jan 21 10:47:31 2013 -0800
Committer: Joel Koshy <jj...@apache.org>
Committed: Mon Jan 21 10:47:31 2013 -0800
----------------------------------------------------------------------
.../kafka/controller/ReplicaStateMachine.scala | 16 +++++++-------
1 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f112ef2b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 1d5bac4..264285c 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -140,14 +140,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// check if the leader for this partition is alive or even exists
controllerContext.allLeaders.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
- controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
- case true => // leader is alive
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
- topic, partition, leaderIsrAndControllerEpoch,
- replicaAssignment.size)
- replicaState.put((topic, partition, replicaId), OnlineReplica)
- info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
- case false => // ignore partitions whose leader is not alive
+ val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader
+ if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) {
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
+ topic, partition, leaderIsrAndControllerEpoch,
+ replicaAssignment.size)
+ replicaState.put((topic, partition, replicaId), OnlineReplica)
+ info("Replica %d for partition [%s, %d] state changed to OnlineReplica"
+ .format(replicaId, topic, partition))
}
case None => // ignore partitions who don't have a leader yet
}