You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/04/27 20:51:40 UTC

[kafka] branch trunk updated: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db9e55a  KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524)
db9e55a is described below

commit db9e55a50f93d82e4aad5e4f82a13fac0e93759e
Author: Leonard Ge <62...@users.noreply.github.com>
AuthorDate: Mon Apr 27 21:51:10 2020 +0100

    KAFKA-9866: Avoid election for topics where preferred leader is not in ISR (#8524)
    
    In this commit we made sure that the auto leader election only happens after the newly starter broker is in the isr.
    
    No accompany tests are added due to the fact that:
    
    this is a change to the private method and no public facing change is made
    it is hard to create tests for this change without considerable effort
    
    Reviewers: Stanislav Kozlovski <st...@outlook.com>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/controller/KafkaController.scala  | 13 ++++++++++++-
 .../unit/kafka/controller/ControllerIntegrationTest.scala   |  4 ++--
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9ad7b6f..82be66a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
         val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
           controllerContext.partitionsBeingReassigned.isEmpty &&
           !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
-          controllerContext.allTopics.contains(tp.topic))
+          controllerContext.allTopics.contains(tp.topic) &&
+          canPreferredReplicaBeLeader(tp)
+       )
         onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
       }
     }
   }
 
+  private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = {
+    val assignment = controllerContext.partitionReplicaAssignment(tp)
+    val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp))
+    val isr = controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr
+    PartitionLeaderElectionAlgorithms
+      .preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
+      .nonEmpty
+  }
+
   private def processAutoPreferredReplicaLeaderElection(): Unit = {
     if (!isActive) return
     try {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index c4b5f47..c7a1cd5 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -433,8 +433,8 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     waitForPartitionState(tp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    servers(1).shutdown()
-    servers(1).awaitShutdown()
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
     TestUtils.waitUntilTrue(() => {
       val leaderIsrAndControllerEpochMap = zkClient.getTopicPartitionStates(Seq(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&