You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/17 21:30:21 UTC

[kafka] branch trunk updated: MINOR: Simplify controller election utilities (#6944)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 52cd59b  MINOR: Simplify controller election utilities (#6944)
52cd59b is described below

commit 52cd59bdb4de89c15f41984859469100f919dd6f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 17 14:30:05 2019 -0700

    MINOR: Simplify controller election utilities (#6944)
    
    This patch simplifies the controller election API. We were passing `LeaderIsrAndControllerEpoch` into the election utilities even though we just needed `LeaderAndIsr`. We also remove some unneeded collection copies `doElectLeaderForPartitions`.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../src/main/scala/kafka/controller/Election.scala | 58 +++++++++---------
 .../kafka/controller/PartitionStateMachine.scala   | 68 +++++++++++-----------
 .../controller/MockPartitionStateMachine.scala     | 40 ++++++-------
 3 files changed, 82 insertions(+), 84 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala
index 3896e00..e30f69a 100644
--- a/core/src/main/scala/kafka/controller/Election.scala
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -24,21 +24,21 @@ case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[L
 object Election {
 
   private def leaderForOffline(partition: TopicPartition,
-                               leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch],
+                               leaderAndIsrOpt: Option[LeaderAndIsr],
                                uncleanLeaderElectionEnabled: Boolean,
                                controllerContext: ControllerContext): ElectionResult = {
 
     val assignment = controllerContext.partitionReplicaAssignment(partition)
     val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-    leaderIsrAndControllerEpochOpt match {
-      case Some(leaderIsrAndControllerEpoch) =>
-        val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    leaderAndIsrOpt match {
+      case Some(leaderAndIsr) =>
+        val isr = leaderAndIsr.isr
         val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(
           assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
         val newLeaderAndIsrOpt = leaderOpt.map { leader =>
           val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
           else List(leader)
-          leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+          leaderAndIsr.newLeaderAndIsr(leader, newIsr)
         }
         ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)
 
@@ -59,22 +59,22 @@ object Election {
    */
   def leaderForOffline(
     controllerContext: ControllerContext,
-    partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
+    partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)]
   ): Seq[ElectionResult] = {
     partitionsWithUncleanLeaderElectionState.map {
-      case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
-        leaderForOffline(partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled, controllerContext)
+      case (partition, leaderAndIsrOpt, uncleanLeaderElectionEnabled) =>
+        leaderForOffline(partition, leaderAndIsrOpt, uncleanLeaderElectionEnabled, controllerContext)
     }
   }
 
   private def leaderForReassign(partition: TopicPartition,
-                                leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                leaderAndIsr: LeaderAndIsr,
                                 controllerContext: ControllerContext): ElectionResult = {
     val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
     val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-    val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    val isr = leaderAndIsr.isr
     val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, liveReplicas.toSet)
-    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
     ElectionResult(partition, newLeaderAndIsrOpt, reassignment)
   }
 
@@ -82,26 +82,26 @@ object Election {
    * Elect leaders for partitions that are undergoing reassignment.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+   * @param leaderAndIsrs A sequence of tuples representing the partitions that need election
    *                                     and their respective leader/ISR states
    *
    * @return The election results
    */
   def leaderForReassign(controllerContext: ControllerContext,
-                        leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
-    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      leaderForReassign(partition, leaderIsrAndControllerEpoch, controllerContext)
+                        leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
+    leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+      leaderForReassign(partition, leaderAndIsr, controllerContext)
     }
   }
 
   private def leaderForPreferredReplica(partition: TopicPartition,
-                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                        leaderAndIsr: LeaderAndIsr,
                                         controllerContext: ControllerContext): ElectionResult = {
     val assignment = controllerContext.partitionReplicaAssignment(partition)
     val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
-    val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    val isr = leaderAndIsr.isr
     val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
-    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
     ElectionResult(partition, newLeaderAndIsrOpt, assignment)
   }
 
@@ -109,30 +109,30 @@ object Election {
    * Elect preferred leaders.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+   * @param leaderAndIsrs A sequence of tuples representing the partitions that need election
    *                                     and their respective leader/ISR states
    *
    * @return The election results
    */
   def leaderForPreferredReplica(controllerContext: ControllerContext,
-                                leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
-    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      leaderForPreferredReplica(partition, leaderIsrAndControllerEpoch, controllerContext)
+                                leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
+    leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+      leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)
     }
   }
 
   private def leaderForControlledShutdown(partition: TopicPartition,
-                                          leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                          leaderAndIsr: LeaderAndIsr,
                                           shuttingDownBrokerIds: Set[Int],
                                           controllerContext: ControllerContext): ElectionResult = {
     val assignment = controllerContext.partitionReplicaAssignment(partition)
     val liveOrShuttingDownReplicas = assignment.filter(replica =>
       controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true))
-    val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+    val isr = leaderAndIsr.isr
     val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr,
       liveOrShuttingDownReplicas.toSet, shuttingDownBrokerIds)
     val newIsr = isr.filter(replica => !shuttingDownBrokerIds.contains(replica))
-    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr))
+    val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeaderAndIsr(leader, newIsr))
     ElectionResult(partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
   }
 
@@ -140,16 +140,16 @@ object Election {
    * Elect leaders for partitions whose current leaders are shutting down.
    *
    * @param controllerContext Context with the current state of the cluster
-   * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+   * @param leaderAndIsrs A sequence of tuples representing the partitions that need election
    *                                     and their respective leader/ISR states
    *
    * @return The election results
    */
   def leaderForControlledShutdown(controllerContext: ControllerContext,
-                                  leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
+                                  leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
     val shuttingDownBrokerIds = controllerContext.shuttingDownBrokerIds.toSet
-    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
-      leaderForControlledShutdown(partition, leaderIsrAndControllerEpoch, shuttingDownBrokerIds, controllerContext)
+    leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+      leaderForControlledShutdown(partition, leaderAndIsr, shuttingDownBrokerIds, controllerContext)
     }
   }
 }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ab4e8d4..fca78ef 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -371,17 +371,28 @@ class ZkPartitionStateMachine(config: KafkaConfig,
         return (partitions.map(_ -> Left(e))(breakOut), Seq.empty)
     }
     val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
-    val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
+    val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
+
     getDataResponses.foreach { getDataResponse =>
       val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
       val currState = partitionState(partition)
       if (getDataResponse.resultCode == Code.OK) {
-        val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
-        if (leaderIsrAndControllerEpochOpt.isEmpty) {
-          val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
-          failedElections.put(partition, Left(exception))
+        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
+              val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
+                s"already written by another controller. This probably means that the current controller $controllerId went through " +
+                s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
+              failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
+            } else {
+              validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
+            }
+
+          case None =>
+            val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
+            failedElections.put(partition, Left(exception))
         }
-        leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get
+
       } else if (getDataResponse.resultCode == Code.NONODE) {
         val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
         failedElections.put(partition, Left(exception))
@@ -390,33 +401,23 @@ class ZkPartitionStateMachine(config: KafkaConfig,
       }
     }
 
-    val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
-      leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
-    }
-    invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
-      val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
-        s"already written by another controller. This probably means that the current controller $controllerId went through " +
-        s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
-      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
-    }
-
-    if (validPartitionsForElection.isEmpty) {
+    if (validLeaderAndIsrs.isEmpty) {
       return (failedElections.toMap, Seq.empty)
     }
 
     val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
       case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
         val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
-          validPartitionsForElection,
+          validLeaderAndIsrs,
           allowUnclean
         )
         leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
       case ReassignPartitionLeaderElectionStrategy =>
-        leaderForReassign(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
+        leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
       case PreferredReplicaPartitionLeaderElectionStrategy =>
-        leaderForPreferredReplica(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
+        leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
       case ControlledShutdownPartitionLeaderElectionStrategy =>
-        leaderForControlledShutdown(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
+        leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
     }
     partitionsWithoutLeaders.foreach { electionResult =>
       val partition = electionResult.topicPartition
@@ -454,21 +455,18 @@ class ZkPartitionStateMachine(config: KafkaConfig,
    *         3. allow unclean
    */
   private def collectUncleanLeaderElectionState(
-    leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)],
+    leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)],
     allowUnclean: Boolean
-  ): Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)] = {
-    val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition {
-      case (partition, leaderIsrAndControllerEpoch) =>
-        val liveInSyncReplicas = leaderIsrAndControllerEpoch
-          .leaderAndIsr
-          .isr
-          .filter(replica => controllerContext.isReplicaOnline(replica, partition))
+  ): Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)] = {
+    val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderAndIsrs.partition {
+      case (partition, leaderAndIsr) =>
+        val liveInSyncReplicas = leaderAndIsr.isr.filter(controllerContext.isReplicaOnline(_, partition))
         liveInSyncReplicas.isEmpty
     }
 
     val electionForPartitionWithoutLiveReplicas = if (allowUnclean) {
-      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
-        (partition, Option(leaderIsrAndControllerEpoch), true)
+      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
+        (partition, Option(leaderAndIsr), true)
       }
     } else {
       val (logConfigs, failed) = zkClient.getLogConfigs(
@@ -476,14 +474,14 @@ class ZkPartitionStateMachine(config: KafkaConfig,
         config.originals()
       )
 
-      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+      partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
         if (failed.contains(partition.topic)) {
           logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
           (partition, None, false)
         } else {
           (
             partition,
-            Option(leaderIsrAndControllerEpoch),
+            Option(leaderAndIsr),
             logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()
           )
         }
@@ -491,8 +489,8 @@ class ZkPartitionStateMachine(config: KafkaConfig,
     }
 
     electionForPartitionWithoutLiveReplicas ++
-    partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
-      (partition, Option(leaderIsrAndControllerEpoch), false)
+    partitionsWithLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
+      (partition, Option(leaderAndIsr), false)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 0e2c8be..94fd177 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -20,7 +20,8 @@ import kafka.api.LeaderAndIsr
 import kafka.common.StateChangeFailedException
 import kafka.controller.Election._
 import org.apache.kafka.common.TopicPartition
-import scala.collection.breakOut
+
+import scala.collection.{breakOut, mutable}
 
 class MockPartitionStateMachine(controllerContext: ControllerContext,
                                 uncleanLeaderElectionEnabled: Boolean)
@@ -68,34 +69,33 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
     partitions: Seq[TopicPartition],
     leaderElectionStrategy: PartitionLeaderElectionStrategy
   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
-    val leaderIsrAndControllerEpochPerPartition = partitions.map { partition =>
-      partition -> controllerContext.partitionLeadershipInfo(partition)
-    }
-
-    val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
-      leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
-    }
-
-    val failedElections = invalidPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
-      val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
-        s"already written by another controller. This probably means that the current controller went through " +
-        s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
-
-      partition -> Left(new StateChangeFailedException(failMsg))
+    val failedElections = mutable.Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]]
+    val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
+
+    for (partition <- partitions) {
+      val leaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(partition)
+      if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
+        val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
+          s"already written by another controller. This probably means that the current controller went through " +
+          s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
+        failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
+      } else {
+        validLeaderAndIsrs.append((partition, leaderIsrAndControllerEpoch.leaderAndIsr))
+      }
     }
 
     val electionResults = leaderElectionStrategy match {
       case OfflinePartitionLeaderElectionStrategy(isUnclean) =>
-        val partitionsWithUncleanLeaderElectionState = validPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
-          (partition, Some(leaderIsrAndControllerEpoch), isUnclean || uncleanLeaderElectionEnabled)
+        val partitionsWithUncleanLeaderElectionState = validLeaderAndIsrs.map { case (partition, leaderAndIsr) =>
+          (partition, Some(leaderAndIsr), isUnclean || uncleanLeaderElectionEnabled)
         }
         leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState)
       case ReassignPartitionLeaderElectionStrategy =>
-        leaderForReassign(controllerContext, validPartitionsForElection)
+        leaderForReassign(controllerContext, validLeaderAndIsrs)
       case PreferredReplicaPartitionLeaderElectionStrategy =>
-        leaderForPreferredReplica(controllerContext, validPartitionsForElection)
+        leaderForPreferredReplica(controllerContext, validLeaderAndIsrs)
       case ControlledShutdownPartitionLeaderElectionStrategy =>
-        leaderForControlledShutdown(controllerContext, validPartitionsForElection)
+        leaderForControlledShutdown(controllerContext, validLeaderAndIsrs)
     }
 
     val results: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = electionResults.map { electionResult =>