You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/10/23 01:33:38 UTC
[kafka] branch 2.0 updated: KAFKA-7299: Batch LeaderAndIsr requests
for AutoLeaderRebalance (#5515)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new a724c45 KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)
a724c45 is described below
commit a724c454b85ca17b3affd23e2e82cd2912bc7513
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Aug 17 05:54:58 2018 +0800
KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)
Reviewers: Jun Rao <ju...@gmail.com>
---
.../main/scala/kafka/controller/KafkaController.scala | 17 +++++++----------
1 file changed, 7 insertions(+), 10 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 11d22fd..f3192a3 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -990,16 +990,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
- topicsNotInPreferredReplica.keys.foreach { topicPartition =>
- // do this check only if the broker is live and there are no partitions being reassigned currently
- // and preferred replica election is not in progress
- if (controllerContext.isReplicaOnline(leaderBroker, topicPartition) &&
- controllerContext.partitionsBeingReassigned.isEmpty &&
- !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
- controllerContext.allTopics.contains(topicPartition.topic)) {
- onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true)
- }
- }
+ // do this check only if the broker is live and there are no partitions being reassigned currently
+ // and preferred replica election is not in progress
+ val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
+ controllerContext.partitionsBeingReassigned.isEmpty &&
+ !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
+ controllerContext.allTopics.contains(tp.topic))
+ onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true)
}
}
}