You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC

[11/28] git commit: KAFKA-705 Shutting down brokers should not receive stop replica requests if controlled shutdown status is incomplete; reviewed by Neha Narkhede

KAFKA-705 Shutting down brokers should not receive stop replica requests if controlled shutdown status is incomplete; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d197f9b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d197f9b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d197f9b

Branch: refs/heads/trunk
Commit: 0d197f9b97c586d53bb5328eaac67ef0c39601bd
Parents: 5e3e181
Author: Joel Koshy <jj...@apache.org>
Authored: Tue Jan 22 12:12:11 2013 -0800
Committer: Joel Koshy <jj...@apache.org>
Committed: Tue Jan 22 12:12:11 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/controller/KafkaController.scala   |   35 ++++++++-------
 1 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0d197f9b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7b5d5c2..565c53a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -166,31 +166,34 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         }
       }
 
+      val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
+
       /*
       * Force the shutting down broker out of the ISR of partitions that it
       * follows, and shutdown the corresponding replica fetcher threads.
       * This is really an optimization, so no need to register any callback
       * to wait until completion.
       */
-      brokerRequestBatch.newBatch()
-      allPartitionsAndReplicationFactorOnBroker foreach {
-        case(topicAndPartition, replicationFactor) =>
-          val (topic, partition) = topicAndPartition.asTuple
-          if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
-            brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
-            removeReplicaFromIsr(topic, partition, id) match {
-              case Some(updatedLeaderIsrAndControllerEpoch) =>
-                brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-                  Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
-                  updatedLeaderIsrAndControllerEpoch, replicationFactor)
-              case None =>
-              // ignore
+      if (partitionsRemaining.size == 0) {
+        brokerRequestBatch.newBatch()
+        allPartitionsAndReplicationFactorOnBroker foreach {
+          case(topicAndPartition, replicationFactor) =>
+            val (topic, partition) = topicAndPartition.asTuple
+            if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
+              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
+              removeReplicaFromIsr(topic, partition, id) match {
+                case Some(updatedLeaderIsrAndControllerEpoch) =>
+                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
+                    Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
+                    updatedLeaderIsrAndControllerEpoch, replicationFactor)
+                case None =>
+                // ignore
+              }
             }
-          }
+        }
+        brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
       }
-      brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
 
-      val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
       debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
       partitionsRemaining.size
     }