You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/07/03 06:54:48 UTC

git commit: kafka-1503; all partitions are using same broker as their leader after broker is down; patched by Jianwen Wang; reviewed by Guozhang Wang and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk b8d87d0b2 -> 2a4718c1a


kafka-1503; all partitions are using same broker as their leader after broker is down;  patched by Jianwen Wang; reviewed by Guozhang Wang and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2a4718c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2a4718c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2a4718c1

Branch: refs/heads/trunk
Commit: 2a4718c1a7e8e1566c5c87468779fdd1f95fe3bc
Parents: b8d87d0
Author: Jianwen Wang <Ja...@gmail.com>
Authored: Wed Jul 2 21:54:42 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jul 2 21:54:42 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/controller/PartitionLeaderSelector.scala   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2a4718c1/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index d3b25fa..4a31c72 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -83,7 +83,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
                 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
             }
           case false =>
-            val newLeader = liveBrokersInIsr.head
+            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
+            val newLeader = liveReplicasInIsr.head
             debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
                   .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
@@ -210,4 +211,4 @@ class NoOpLeaderSelector(controllerContext: ControllerContext) extends Partition
     warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
     (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
   }
-}
\ No newline at end of file
+}