You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/06/17 21:30:21 UTC
[kafka] branch trunk updated: MINOR: Simplify controller election
utilities (#6944)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 52cd59b MINOR: Simplify controller election utilities (#6944)
52cd59b is described below
commit 52cd59bdb4de89c15f41984859469100f919dd6f
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Jun 17 14:30:05 2019 -0700
MINOR: Simplify controller election utilities (#6944)
This patch simplifies the controller election API. We were passing `LeaderIsrAndControllerEpoch` into the election utilities even though we just needed `LeaderAndIsr`. We also remove some unneeded collection copies `doElectLeaderForPartitions`.
Reviewers: Ismael Juma <is...@juma.me.uk>
---
.../src/main/scala/kafka/controller/Election.scala | 58 +++++++++---------
.../kafka/controller/PartitionStateMachine.scala | 68 +++++++++++-----------
.../controller/MockPartitionStateMachine.scala | 40 ++++++-------
3 files changed, 82 insertions(+), 84 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/Election.scala b/core/src/main/scala/kafka/controller/Election.scala
index 3896e00..e30f69a 100644
--- a/core/src/main/scala/kafka/controller/Election.scala
+++ b/core/src/main/scala/kafka/controller/Election.scala
@@ -24,21 +24,21 @@ case class ElectionResult(topicPartition: TopicPartition, leaderAndIsr: Option[L
object Election {
private def leaderForOffline(partition: TopicPartition,
- leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch],
+ leaderAndIsrOpt: Option[LeaderAndIsr],
uncleanLeaderElectionEnabled: Boolean,
controllerContext: ControllerContext): ElectionResult = {
val assignment = controllerContext.partitionReplicaAssignment(partition)
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
- leaderIsrAndControllerEpochOpt match {
- case Some(leaderIsrAndControllerEpoch) =>
- val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+ leaderAndIsrOpt match {
+ case Some(leaderAndIsr) =>
+ val isr = leaderAndIsr.isr
val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(
assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext)
val newLeaderAndIsrOpt = leaderOpt.map { leader =>
val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
else List(leader)
- leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)
+ leaderAndIsr.newLeaderAndIsr(leader, newIsr)
}
ElectionResult(partition, newLeaderAndIsrOpt, liveReplicas)
@@ -59,22 +59,22 @@ object Election {
*/
def leaderForOffline(
controllerContext: ControllerContext,
- partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
+ partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)]
): Seq[ElectionResult] = {
partitionsWithUncleanLeaderElectionState.map {
- case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
- leaderForOffline(partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled, controllerContext)
+ case (partition, leaderAndIsrOpt, uncleanLeaderElectionEnabled) =>
+ leaderForOffline(partition, leaderAndIsrOpt, uncleanLeaderElectionEnabled, controllerContext)
}
}
private def leaderForReassign(partition: TopicPartition,
- leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+ leaderAndIsr: LeaderAndIsr,
controllerContext: ControllerContext): ElectionResult = {
val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
- val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+ val isr = leaderAndIsr.isr
val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, liveReplicas.toSet)
- val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+ val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
ElectionResult(partition, newLeaderAndIsrOpt, reassignment)
}
@@ -82,26 +82,26 @@ object Election {
* Elect leaders for partitions that are undergoing reassignment.
*
* @param controllerContext Context with the current state of the cluster
- * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+ * @param leaderAndIsrs A sequence of tuples representing the partitions that need election
* and their respective leader/ISR states
*
* @return The election results
*/
def leaderForReassign(controllerContext: ControllerContext,
- leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
- leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
- leaderForReassign(partition, leaderIsrAndControllerEpoch, controllerContext)
+ leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
+ leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+ leaderForReassign(partition, leaderAndIsr, controllerContext)
}
}
private def leaderForPreferredReplica(partition: TopicPartition,
- leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+ leaderAndIsr: LeaderAndIsr,
controllerContext: ControllerContext): ElectionResult = {
val assignment = controllerContext.partitionReplicaAssignment(partition)
val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
- val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+ val isr = leaderAndIsr.isr
val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
- val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+ val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeader(leader))
ElectionResult(partition, newLeaderAndIsrOpt, assignment)
}
@@ -109,30 +109,30 @@ object Election {
* Elect preferred leaders.
*
* @param controllerContext Context with the current state of the cluster
- * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+ * @param leaderAndIsrs A sequence of tuples representing the partitions that need election
* and their respective leader/ISR states
*
* @return The election results
*/
def leaderForPreferredReplica(controllerContext: ControllerContext,
- leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
- leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
- leaderForPreferredReplica(partition, leaderIsrAndControllerEpoch, controllerContext)
+ leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
+ leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+ leaderForPreferredReplica(partition, leaderAndIsr, controllerContext)
}
}
private def leaderForControlledShutdown(partition: TopicPartition,
- leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+ leaderAndIsr: LeaderAndIsr,
shuttingDownBrokerIds: Set[Int],
controllerContext: ControllerContext): ElectionResult = {
val assignment = controllerContext.partitionReplicaAssignment(partition)
val liveOrShuttingDownReplicas = assignment.filter(replica =>
controllerContext.isReplicaOnline(replica, partition, includeShuttingDownBrokers = true))
- val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+ val isr = leaderAndIsr.isr
val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr,
liveOrShuttingDownReplicas.toSet, shuttingDownBrokerIds)
val newIsr = isr.filter(replica => !shuttingDownBrokerIds.contains(replica))
- val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr))
+ val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderAndIsr.newLeaderAndIsr(leader, newIsr))
ElectionResult(partition, newLeaderAndIsrOpt, liveOrShuttingDownReplicas)
}
@@ -140,16 +140,16 @@ object Election {
* Elect leaders for partitions whose current leaders are shutting down.
*
* @param controllerContext Context with the current state of the cluster
- * @param leaderIsrAndControllerEpochs A sequence of tuples representing the partitions that need election
+ * @param leaderAndIsrs A sequence of tuples representing the partitions that need election
* and their respective leader/ISR states
*
* @return The election results
*/
def leaderForControlledShutdown(controllerContext: ControllerContext,
- leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[ElectionResult] = {
+ leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)]): Seq[ElectionResult] = {
val shuttingDownBrokerIds = controllerContext.shuttingDownBrokerIds.toSet
- leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
- leaderForControlledShutdown(partition, leaderIsrAndControllerEpoch, shuttingDownBrokerIds, controllerContext)
+ leaderAndIsrs.map { case (partition, leaderAndIsr) =>
+ leaderForControlledShutdown(partition, leaderAndIsr, shuttingDownBrokerIds, controllerContext)
}
}
}
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ab4e8d4..fca78ef 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -371,17 +371,28 @@ class ZkPartitionStateMachine(config: KafkaConfig,
return (partitions.map(_ -> Left(e))(breakOut), Seq.empty)
}
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
- val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
+ val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
+
getDataResponses.foreach { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition)
if (getDataResponse.resultCode == Code.OK) {
- val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
- if (leaderIsrAndControllerEpochOpt.isEmpty) {
- val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
- failedElections.put(partition, Left(exception))
+ TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
+ case Some(leaderIsrAndControllerEpoch) =>
+ if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
+ val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
+ s"already written by another controller. This probably means that the current controller $controllerId went through " +
+ s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
+ failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
+ } else {
+ validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
+ }
+
+ case None =>
+ val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
+ failedElections.put(partition, Left(exception))
}
- leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get
+
} else if (getDataResponse.resultCode == Code.NONODE) {
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
@@ -390,33 +401,23 @@ class ZkPartitionStateMachine(config: KafkaConfig,
}
}
- val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
- leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
- }
- invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
- val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
- s"already written by another controller. This probably means that the current controller $controllerId went through " +
- s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
- failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
- }
-
- if (validPartitionsForElection.isEmpty) {
+ if (validLeaderAndIsrs.isEmpty) {
return (failedElections.toMap, Seq.empty)
}
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
- validPartitionsForElection,
+ validLeaderAndIsrs,
allowUnclean
)
leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
case ReassignPartitionLeaderElectionStrategy =>
- leaderForReassign(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
+ leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case PreferredReplicaPartitionLeaderElectionStrategy =>
- leaderForPreferredReplica(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
+ leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case ControlledShutdownPartitionLeaderElectionStrategy =>
- leaderForControlledShutdown(controllerContext, validPartitionsForElection).partition(_.leaderAndIsr.isEmpty)
+ leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
}
partitionsWithoutLeaders.foreach { electionResult =>
val partition = electionResult.topicPartition
@@ -454,21 +455,18 @@ class ZkPartitionStateMachine(config: KafkaConfig,
* 3. allow unclean
*/
private def collectUncleanLeaderElectionState(
- leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)],
+ leaderAndIsrs: Seq[(TopicPartition, LeaderAndIsr)],
allowUnclean: Boolean
- ): Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)] = {
- val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition {
- case (partition, leaderIsrAndControllerEpoch) =>
- val liveInSyncReplicas = leaderIsrAndControllerEpoch
- .leaderAndIsr
- .isr
- .filter(replica => controllerContext.isReplicaOnline(replica, partition))
+ ): Seq[(TopicPartition, Option[LeaderAndIsr], Boolean)] = {
+ val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderAndIsrs.partition {
+ case (partition, leaderAndIsr) =>
+ val liveInSyncReplicas = leaderAndIsr.isr.filter(controllerContext.isReplicaOnline(_, partition))
liveInSyncReplicas.isEmpty
}
val electionForPartitionWithoutLiveReplicas = if (allowUnclean) {
- partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
- (partition, Option(leaderIsrAndControllerEpoch), true)
+ partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
+ (partition, Option(leaderAndIsr), true)
}
} else {
val (logConfigs, failed) = zkClient.getLogConfigs(
@@ -476,14 +474,14 @@ class ZkPartitionStateMachine(config: KafkaConfig,
config.originals()
)
- partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+ partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
if (failed.contains(partition.topic)) {
logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
(partition, None, false)
} else {
(
partition,
- Option(leaderIsrAndControllerEpoch),
+ Option(leaderAndIsr),
logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()
)
}
@@ -491,8 +489,8 @@ class ZkPartitionStateMachine(config: KafkaConfig,
}
electionForPartitionWithoutLiveReplicas ++
- partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
- (partition, Option(leaderIsrAndControllerEpoch), false)
+ partitionsWithLiveInSyncReplicas.map { case (partition, leaderAndIsr) =>
+ (partition, Option(leaderAndIsr), false)
}
}
diff --git a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 0e2c8be..94fd177 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -20,7 +20,8 @@ import kafka.api.LeaderAndIsr
import kafka.common.StateChangeFailedException
import kafka.controller.Election._
import org.apache.kafka.common.TopicPartition
-import scala.collection.breakOut
+
+import scala.collection.{breakOut, mutable}
class MockPartitionStateMachine(controllerContext: ControllerContext,
uncleanLeaderElectionEnabled: Boolean)
@@ -68,34 +69,33 @@ class MockPartitionStateMachine(controllerContext: ControllerContext,
partitions: Seq[TopicPartition],
leaderElectionStrategy: PartitionLeaderElectionStrategy
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
- val leaderIsrAndControllerEpochPerPartition = partitions.map { partition =>
- partition -> controllerContext.partitionLeadershipInfo(partition)
- }
-
- val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
- leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
- }
-
- val failedElections = invalidPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
- val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
- s"already written by another controller. This probably means that the current controller went through " +
- s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
-
- partition -> Left(new StateChangeFailedException(failMsg))
+ val failedElections = mutable.Map.empty[TopicPartition, Either[Throwable, LeaderAndIsr]]
+ val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
+
+ for (partition <- partitions) {
+ val leaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(partition)
+ if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
+ val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
+ s"already written by another controller. This probably means that the current controller went through " +
+ s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
+ failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
+ } else {
+ validLeaderAndIsrs.append((partition, leaderIsrAndControllerEpoch.leaderAndIsr))
+ }
}
val electionResults = leaderElectionStrategy match {
case OfflinePartitionLeaderElectionStrategy(isUnclean) =>
- val partitionsWithUncleanLeaderElectionState = validPartitionsForElection.map { case (partition, leaderIsrAndControllerEpoch) =>
- (partition, Some(leaderIsrAndControllerEpoch), isUnclean || uncleanLeaderElectionEnabled)
+ val partitionsWithUncleanLeaderElectionState = validLeaderAndIsrs.map { case (partition, leaderAndIsr) =>
+ (partition, Some(leaderAndIsr), isUnclean || uncleanLeaderElectionEnabled)
}
leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState)
case ReassignPartitionLeaderElectionStrategy =>
- leaderForReassign(controllerContext, validPartitionsForElection)
+ leaderForReassign(controllerContext, validLeaderAndIsrs)
case PreferredReplicaPartitionLeaderElectionStrategy =>
- leaderForPreferredReplica(controllerContext, validPartitionsForElection)
+ leaderForPreferredReplica(controllerContext, validLeaderAndIsrs)
case ControlledShutdownPartitionLeaderElectionStrategy =>
- leaderForControlledShutdown(controllerContext, validPartitionsForElection)
+ leaderForControlledShutdown(controllerContext, validLeaderAndIsrs)
}
val results: Map[TopicPartition, Either[Exception, LeaderAndIsr]] = electionResults.map { electionResult =>