You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/11/29 18:29:13 UTC

git commit: kafka-1152; ReplicaManager's handling of the leaderAndIsrRequest should gracefully handle leader == -1; patched by Swapnil Ghike; reviewed by Jun Rao

Updated Branches:
  refs/heads/trunk df288b75a -> 224f192c7


kafka-1152; ReplicaManager's handling of the leaderAndIsrRequest should gracefully handle leader == -1; patched by Swapnil Ghike; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/224f192c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/224f192c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/224f192c

Branch: refs/heads/trunk
Commit: 224f192c776b4348596fbf068771c98aa9db9f3d
Parents: df288b7
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Fri Nov 29 09:29:56 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Nov 29 09:29:56 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/server/ReplicaManager.scala     | 24 ++++++++++++++------
 1 file changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/224f192c/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 161f581..54f6e16 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -328,8 +328,7 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition caused it
    *  TODO: the above may need to be fixed later
    */
-  private def makeFollowers(controllerId: Int, epoch: Int,
-                            partitionState: Map[Partition, PartitionStateInfo],
+  private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
                             leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
       "starting the become-follower transition for partitions %s")
@@ -351,11 +350,22 @@ class ReplicaManager(val config: KafkaConfig,
         .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
 
       if (!isShuttingDown.get()) {
-        replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) =>
-          new TopicAndPartition(partition) ->
-            BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get,
-              partition.getReplica().get.logEndOffset)}
-        )
+        val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
+        partitionState.foreach {
+          case (partition, partitionStateInfo) =>
+            val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
+            leaders.find(_.id == leader) match {
+              case Some(leaderBroker) =>
+                partitionAndOffsets.put(new TopicAndPartition(partition), 
+                                        BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
+              case None =>
+                stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d " +
+                                        "controller %d epoch %d for topic-partition %s since the designated leader %d " +
+                                        "cannot be found in live or shutting down brokers %s"
+                                          .format(localBrokerId, correlationId, controllerId, epoch, partition, leader, leaders))
+            }
+        }
+        replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
       }
       else {
         stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +