You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[11/28] git commit: KAFKA-705 Shutting down brokers should not
receive stop replica requests if controlled shutdown status is incomplete;
reviewed by Neha Narkhede
KAFKA-705 Shutting down brokers should not receive stop replica requests if controlled shutdown status is incomplete; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0d197f9b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0d197f9b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0d197f9b
Branch: refs/heads/trunk
Commit: 0d197f9b97c586d53bb5328eaac67ef0c39601bd
Parents: 5e3e181
Author: Joel Koshy <jj...@apache.org>
Authored: Tue Jan 22 12:12:11 2013 -0800
Committer: Joel Koshy <jj...@apache.org>
Committed: Tue Jan 22 12:12:11 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/controller/KafkaController.scala | 35 ++++++++-------
1 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0d197f9b/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7b5d5c2..565c53a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -166,31 +166,34 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
}
+ val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
+
/*
* Force the shutting down broker out of the ISR of partitions that it
* follows, and shutdown the corresponding replica fetcher threads.
* This is really an optimization, so no need to register any callback
* to wait until completion.
*/
- brokerRequestBatch.newBatch()
- allPartitionsAndReplicationFactorOnBroker foreach {
- case(topicAndPartition, replicationFactor) =>
- val (topic, partition) = topicAndPartition.asTuple
- if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
- brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
- removeReplicaFromIsr(topic, partition, id) match {
- case Some(updatedLeaderIsrAndControllerEpoch) =>
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
- Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
- updatedLeaderIsrAndControllerEpoch, replicationFactor)
- case None =>
- // ignore
+ if (partitionsRemaining.size == 0) {
+ brokerRequestBatch.newBatch()
+ allPartitionsAndReplicationFactorOnBroker foreach {
+ case(topicAndPartition, replicationFactor) =>
+ val (topic, partition) = topicAndPartition.asTuple
+ if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
+ brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
+ removeReplicaFromIsr(topic, partition, id) match {
+ case Some(updatedLeaderIsrAndControllerEpoch) =>
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
+ Seq(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader), topic, partition,
+ updatedLeaderIsrAndControllerEpoch, replicationFactor)
+ case None =>
+ // ignore
+ }
}
- }
+ }
+ brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
}
- brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
- val partitionsRemaining = replicatedPartitionsBrokerLeads().toSet
debug("Remaining partitions to move on broker %d: %s".format(id, partitionsRemaining.mkString(",")))
partitionsRemaining.size
}