You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/30 05:08:39 UTC
[kafka] branch trunk updated: KAFKA-6630: Speed up the processing
of TopicDeletionStopReplicaResponseReceived events on the controller
(#4668)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2ef6ee2 KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668)
2ef6ee2 is described below
commit 2ef6ee2338178c7501f5bd4c7cce5f4cea9d3e17
Author: gitlw <lu...@gmail.com>
AuthorDate: Thu Mar 29 22:08:28 2018 -0700
KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668)
Reviewed by Jun Rao <ju...@gmail.com>
---
.../scala/kafka/controller/ControllerContext.scala | 90 ++++++++++++----
.../scala/kafka/controller/KafkaController.scala | 117 ++++++++++-----------
.../kafka/controller/PartitionStateMachine.scala | 2 +-
.../kafka/controller/ReplicaStateMachine.scala | 7 +-
.../kafka/controller/TopicDeletionManager.scala | 3 +-
.../controller/PartitionStateMachineTest.scala | 16 +--
.../kafka/controller/ReplicaStateMachineTest.scala | 12 +--
7 files changed, 148 insertions(+), 99 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 541bce8..f4671cf 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -31,14 +31,46 @@ class ControllerContext {
var epoch: Int = KafkaController.InitialControllerEpoch - 1
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
var allTopics: Set[String] = Set.empty
- var partitionReplicaAssignment: mutable.Map[TopicPartition, Seq[Int]] = mutable.Map.empty
- var partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
+ private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
+ val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
+ def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = {
+ partitionReplicaAssignmentUnderlying.getOrElse(topicPartition.topic, mutable.Map.empty)
+ .getOrElse(topicPartition.partition, Seq.empty)
+ }
+
+ private def clearTopicsState(): Unit = {
+ allTopics = Set.empty
+ partitionReplicaAssignmentUnderlying.clear()
+ partitionLeadershipInfo.clear()
+ partitionsBeingReassigned.clear()
+ replicasOnOfflineDirs.clear()
+ }
+
+ def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = {
+ partitionReplicaAssignmentUnderlying.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
+ .put(topicPartition.partition, newReplicas)
+ }
+
+ def partitionReplicaAssignmentForTopic(topic : String): Map[TopicPartition, Seq[Int]] = {
+ partitionReplicaAssignmentUnderlying.getOrElse(topic, Map.empty).map {
+ case (partition, replicas) => (new TopicPartition(topic, partition), replicas)
+ }.toMap
+ }
+
+ def allPartitions: Set[TopicPartition] = {
+ partitionReplicaAssignmentUnderlying.flatMap {
+ case (topic, topicReplicaAssignment) => topicReplicaAssignment.map {
+ case (partition, _) => new TopicPartition(topic, partition)
+ }
+ }.toSet
+ }
+
// setter
def liveBrokers_=(brokers: Set[Broker]) {
liveBrokersUnderlying = brokers
@@ -53,8 +85,12 @@ class ControllerContext {
def liveOrShuttingDownBrokers = liveBrokersUnderlying
def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
- partitionReplicaAssignment.collect {
- case (topicPartition, replicas) if replicas.contains(brokerId) => topicPartition
+ partitionReplicaAssignmentUnderlying.flatMap {
+ case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {
+ case (_, replicas) => replicas.contains(brokerId)
+ }.map {
+ case (partition, _) => new TopicPartition(topic, partition)
+ }
}.toSet
}
@@ -68,22 +104,26 @@ class ControllerContext {
def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
brokerIds.flatMap { brokerId =>
- partitionReplicaAssignment.collect { case (topicPartition, replicas) if replicas.contains(brokerId) =>
- PartitionAndReplica(topicPartition, brokerId)
+ partitionReplicaAssignmentUnderlying.flatMap {
+ case (topic, topicReplicaAssignment) => topicReplicaAssignment.collect {
+ case (partition, replicas) if replicas.contains(brokerId) =>
+ PartitionAndReplica(new TopicPartition(topic, partition), brokerId)
+ }
}
- }.toSet
+ }
}
def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
- partitionReplicaAssignment
- .filter { case (topicPartition, _) => topicPartition.topic == topic }
- .flatMap { case (topicPartition, replicas) =>
- replicas.map(PartitionAndReplica(topicPartition, _))
- }.toSet
+ partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).flatMap {
+ case (partition, replicas) => replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r))
+ }.toSet
}
- def partitionsForTopic(topic: String): collection.Set[TopicPartition] =
- partitionReplicaAssignment.keySet.filter(topicPartition => topicPartition.topic == topic)
+ def partitionsForTopic(topic: String): collection.Set[TopicPartition] = {
+ partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).map {
+ case (partition, _) => new TopicPartition(topic, partition)
+ }.toSet
+ }
def allLiveReplicas(): Set[PartitionAndReplica] = {
replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
@@ -98,10 +138,24 @@ class ControllerContext {
}
}
- def removeTopic(topic: String) = {
- partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic }
- partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic }
- allTopics -= topic
+ def resetContext(): Unit = {
+ if (controllerChannelManager != null) {
+ controllerChannelManager.shutdown()
+ controllerChannelManager = null
+ }
+ shuttingDownBrokerIds.clear()
+ epoch = 0
+ epochZkVersion = 0
+ clearTopicsState()
+ liveBrokers = Set.empty
}
+ def removeTopic(topic: String): Unit = {
+ allTopics -= topic
+ partitionReplicaAssignmentUnderlying.remove(topic)
+ partitionLeadershipInfo.foreach {
+ case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition)
+ case _ =>
+ }
+ }
}
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2cb3f7c..4778a7a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -309,7 +309,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
replicaStateMachine.shutdown()
zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
- resetControllerContext()
+ controllerContext.resetContext()
info("Resigned")
}
@@ -569,28 +569,28 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
}
val newReplicas = reassignedPartitionContext.newReplicas
val topic = tp.topic
- controllerContext.partitionReplicaAssignment.get(tp) match {
- case Some(assignedReplicas) =>
- if (assignedReplicas == newReplicas) {
- info(s"Partition $tp to be reassigned is already assigned to replicas " +
- s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
- removePartitionFromReassignedPartitions(tp)
- } else {
- try {
- info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
- // first register ISR change listener
- reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
- // mark topic ineligible for deletion for the partitions being reassigned
- topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
- onPartitionReassignment(tp, reassignedPartitionContext)
- } catch {
- case e: Throwable =>
- error(s"Error completing reassignment of partition $tp", e)
- // remove the partition from the admin path to unblock the admin client
- removePartitionFromReassignedPartitions(tp)
- }
+ val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
+ if (assignedReplicas.nonEmpty) {
+ if (assignedReplicas == newReplicas) {
+ info(s"Partition $tp to be reassigned is already assigned to replicas " +
+ s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
+ removePartitionFromReassignedPartitions(tp)
+ } else {
+ try {
+ info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
+ // first register ISR change listener
+ reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
+ // mark topic ineligible for deletion for the partitions being reassigned
+ topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
+ onPartitionReassignment(tp, reassignedPartitionContext)
+ } catch {
+ case e: Throwable =>
+ error(s"Error completing reassignment of partition $tp", e)
+ // remove the partition from the admin path to unblock the admin client
+ removePartitionFromReassignedPartitions(tp)
}
- case None =>
+ }
+ } else {
error(s"Ignoring request to reassign partition $tp that doesn't exist.")
removePartitionFromReassignedPartitions(tp)
}
@@ -643,8 +643,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
- controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
- controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
+ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
+ case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
+ }
+ controllerContext.partitionLeadershipInfo.clear()
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// register broker modifications handlers
registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
@@ -662,10 +664,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
// check if they are already completed or topic was deleted
val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
- val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
- val topicDeleted = replicasOpt.isEmpty
+ val replicas = controllerContext.partitionReplicaAssignment(partition)
+ val topicDeleted = replicas.isEmpty
val successful =
- if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
+ if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false
successful || topicDeleted
}
val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
@@ -678,21 +680,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
pendingPreferredReplicaElections
}
- private def resetControllerContext(): Unit = {
- if (controllerContext.controllerChannelManager != null) {
- controllerContext.controllerChannelManager.shutdown()
- controllerContext.controllerChannelManager = null
- }
- controllerContext.shuttingDownBrokerIds.clear()
- controllerContext.epoch = 0
- controllerContext.epochZkVersion = 0
- controllerContext.allTopics = Set.empty
- controllerContext.partitionReplicaAssignment.clear()
- controllerContext.partitionLeadershipInfo.clear()
- controllerContext.partitionsBeingReassigned.clear()
- controllerContext.liveBrokers = Set.empty
- }
-
private def initializePartitionReassignment() {
// read the partitions being reassigned from zookeeper path /admin/reassign_partitions
val partitionsBeingReassigned = zkClient.getPartitionReassignment
@@ -706,9 +693,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
- val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
- replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
- }.keySet.map(_.topic)
+ val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {
+ val replicasForTopic = controllerContext.replicasForTopic(topic)
+ replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))
+ }}
val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
@@ -722,7 +710,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
controllerContext.controllerChannelManager.startup()
}
- private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+ private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq) {
val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -742,7 +730,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
// change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
// request to the current or new leader. This will prevent it from adding the old replicas to the ISR
val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
- controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas)
+ controllerContext.updatePartitionReplicaAssignment(topicPartition, reassignedReplicas)
if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
@@ -778,14 +766,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def updateAssignedReplicasForPartition(partition: TopicPartition,
replicas: Seq[Int]) {
- val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
- partitionsAndReplicasForThisTopic.put(partition, replicas)
- val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+ controllerContext.updatePartitionReplicaAssignment(partition, replicas)
+ val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic))
setDataResponse.resultCode match {
case Code.OK =>
info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}")
// update the assigned replica list after a successful zookeeper write
- controllerContext.partitionReplicaAssignment.put(partition, replicas)
+ controllerContext.updatePartitionReplicaAssignment(partition, replicas)
case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist")
case _ => throw new KafkaException(setDataResponse.resultException.get)
}
@@ -971,9 +958,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
private def checkAndTriggerAutoLeaderRebalance(): Unit = {
trace("Checking need to trigger auto leader balancing")
val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] =
- controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) =>
- topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
- }.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
+ controllerContext.allPartitions.filterNot {
+ tp => topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+ }.map { tp =>
+ (tp, controllerContext.partitionReplicaAssignment(tp) )
+ }.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
+
debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers")
// for each broker, check if a preferred replica election needs to be triggered
@@ -1155,7 +1145,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
if (!isActive) {
0
} else {
- controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+ controllerContext.allPartitions.count { topicPartition =>
+ val replicas = controllerContext.partitionReplicaAssignment(topicPartition)
val preferredReplica = replicas.head
val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) &&
@@ -1273,9 +1264,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
- controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
- !deletedTopics.contains(p._1.topic))
- controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
+ deletedTopics.foreach(controllerContext.removeTopic)
+ addedPartitionReplicaAssignment.foreach {
+ case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas)
+ }
info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
s"[$addedPartitionReplicaAssignment]")
if (addedPartitionReplicaAssignment.nonEmpty)
@@ -1315,8 +1307,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
override def process(): Unit = {
if (!isActive) return
val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
- val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
- !controllerContext.partitionReplicaAssignment.contains(p._1))
+ val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+ controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
+ }
if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
if (partitionsToBeAdded.nonEmpty) {
warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
@@ -1330,7 +1323,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
else {
if (partitionsToBeAdded.nonEmpty) {
info(s"New partitions to be added $partitionsToBeAdded")
- controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
+ partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
+ controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
+ }
onNewPartitionCreation(partitionsToBeAdded.keySet)
}
}
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 2e27272..74bc59f 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -76,7 +76,7 @@ class PartitionStateMachine(config: KafkaConfig,
* zookeeper
*/
private def initializePartitionState() {
- for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
+ for (topicPartition <- controllerContext.allPartitions) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 85af764..a2d04e6 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -80,7 +80,8 @@ class ReplicaStateMachine(config: KafkaConfig,
* in zookeeper
*/
private def initializeReplicaState() {
- controllerContext.partitionReplicaAssignment.foreach { case (partition, replicas) =>
+ controllerContext.allPartitions.foreach { partition =>
+ val replicas = controllerContext.partitionReplicaAssignment(partition)
replicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(partition, replicaId)
if (controllerContext.isReplicaOnline(replicaId, partition))
@@ -181,7 +182,7 @@ class ReplicaStateMachine(config: KafkaConfig,
case NewReplica =>
val assignment = controllerContext.partitionReplicaAssignment(partition)
if (!assignment.contains(replicaId)) {
- controllerContext.partitionReplicaAssignment.put(partition, assignment :+ replicaId)
+ controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId)
}
case _ =>
controllerContext.partitionLeadershipInfo.get(partition) match {
@@ -237,7 +238,7 @@ class ReplicaStateMachine(config: KafkaConfig,
case NonExistentReplica =>
validReplicas.foreach { replica =>
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition)
- controllerContext.partitionReplicaAssignment.put(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
+ controllerContext.updatePartitionReplicaAssignment(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), NonExistentReplica)
replicaState.remove(replica)
}
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index b1d8394..6e14551 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -255,9 +255,8 @@ class TopicDeletionManager(controller: KafkaController,
// send update metadata so that brokers stop serving data for topics to be deleted
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
- val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
- onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
+ onPartitionDeletion(controllerContext.partitionsForTopic(topic))
}
}
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 32e0d43..52f4599 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -80,7 +80,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testNewPartitionToOnlinePartitionTransition(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -98,7 +98,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -114,7 +114,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, NewPartition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -144,7 +144,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testOnlinePartitionToOnlineTransition(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, OnlinePartition)
val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -175,7 +175,7 @@ class PartitionStateMachineTest extends JUnitSuite {
val otherBrokerId = brokerId + 1
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0), TestUtils.createBroker(otherBrokerId, "host", 0))
controllerContext.shuttingDownBrokerIds.add(brokerId)
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId, otherBrokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId, otherBrokerId))
partitionState.put(partition, OnlinePartition)
val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId, otherBrokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -226,7 +226,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testOfflinePartitionToOnlinePartitionTransition(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, OfflinePartition)
val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -257,7 +257,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, OfflinePartition)
val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -278,7 +278,7 @@ class PartitionStateMachineTest extends JUnitSuite {
@Test
def testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup(): Unit = {
controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
partitionState.put(partition, OfflinePartition)
val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 4d38aac..6a961a5 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -104,7 +104,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
@Test
def testNewReplicaToOnlineReplicaTransition(): Unit = {
replicaState.put(replica, NewReplica)
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
assertEquals(OnlineReplica, replicaState(replica))
}
@@ -150,7 +150,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
@Test
def testOnlineReplicaToOnlineReplicaTransition(): Unit = {
replicaState.put(replica, OnlineReplica)
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -168,7 +168,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
val otherBrokerId = brokerId + 1
val replicaIds = List(brokerId, otherBrokerId)
replicaState.put(replica, OnlineReplica)
- controllerContext.partitionReplicaAssignment.put(partition, replicaIds)
+ controllerContext.updatePartitionReplicaAssignment(partition, replicaIds)
val leaderAndIsr = LeaderAndIsr(brokerId, replicaIds)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -225,7 +225,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
@Test
def testOfflineReplicaToOnlineReplicaTransition(): Unit = {
replicaState.put(replica, OfflineReplica)
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -299,7 +299,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
@Test
def testReplicaDeletionSuccessfulToNonexistentReplicaTransition(): Unit = {
replicaState.put(replica, ReplicaDeletionSuccessful)
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
replicaStateMachine.handleStateChanges(replicas, NonExistentReplica)
assertEquals(Seq.empty, controllerContext.partitionReplicaAssignment(partition))
assertEquals(None, replicaState.get(replica))
@@ -343,7 +343,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
@Test
def testReplicaDeletionIneligibleToOnlineReplicaTransition(): Unit = {
replicaState.put(replica, ReplicaDeletionIneligible)
- controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+ controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
--
To stop receiving notification emails like this one, please contact
junrao@apache.org.