You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/10/23 01:33:38 UTC

[kafka] branch 2.0 updated: KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new a724c45  KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)
a724c45 is described below

commit a724c454b85ca17b3affd23e2e82cd2912bc7513
Author: huxi <hu...@hotmail.com>
AuthorDate: Fri Aug 17 05:54:58 2018 +0800

    KAFKA-7299: Batch LeaderAndIsr requests for AutoLeaderRebalance (#5515)
    
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../main/scala/kafka/controller/KafkaController.scala   | 17 +++++++----------
 1 file changed, 7 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 11d22fd..f3192a3 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -990,16 +990,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
       // that need to be on this broker
       if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-        topicsNotInPreferredReplica.keys.foreach { topicPartition =>
-          // do this check only if the broker is live and there are no partitions being reassigned currently
-          // and preferred replica election is not in progress
-          if (controllerContext.isReplicaOnline(leaderBroker, topicPartition) &&
-            controllerContext.partitionsBeingReassigned.isEmpty &&
-            !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
-            controllerContext.allTopics.contains(topicPartition.topic)) {
-            onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true)
-          }
-        }
+        // do this check only if the broker is live and there are no partitions being reassigned currently
+        // and preferred replica election is not in progress
+        val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
+          controllerContext.partitionsBeingReassigned.isEmpty &&
+          !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
+          controllerContext.allTopics.contains(tp.topic))
+        onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true)
       }
     }
   }