You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/03/30 05:08:39 UTC

[kafka] branch trunk updated: KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2ef6ee2  KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668)
2ef6ee2 is described below

commit 2ef6ee2338178c7501f5bd4c7cce5f4cea9d3e17
Author: gitlw <lu...@gmail.com>
AuthorDate: Thu Mar 29 22:08:28 2018 -0700

    KAFKA-6630: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (#4668)
    
    Reviewed by Jun Rao <ju...@gmail.com>
---
 .../scala/kafka/controller/ControllerContext.scala |  90 ++++++++++++----
 .../scala/kafka/controller/KafkaController.scala   | 117 ++++++++++-----------
 .../kafka/controller/PartitionStateMachine.scala   |   2 +-
 .../kafka/controller/ReplicaStateMachine.scala     |   7 +-
 .../kafka/controller/TopicDeletionManager.scala    |   3 +-
 .../controller/PartitionStateMachineTest.scala     |  16 +--
 .../kafka/controller/ReplicaStateMachineTest.scala |  12 +--
 7 files changed, 148 insertions(+), 99 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 541bce8..f4671cf 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -31,14 +31,46 @@ class ControllerContext {
   var epoch: Int = KafkaController.InitialControllerEpoch - 1
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
   var allTopics: Set[String] = Set.empty
-  var partitionReplicaAssignment: mutable.Map[TopicPartition, Seq[Int]] = mutable.Map.empty
-  var partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
+  private var partitionReplicaAssignmentUnderlying: mutable.Map[String, mutable.Map[Int, Seq[Int]]] = mutable.Map.empty
+  val partitionLeadershipInfo: mutable.Map[TopicPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
   val partitionsBeingReassigned: mutable.Map[TopicPartition, ReassignedPartitionsContext] = mutable.Map.empty
   val replicasOnOfflineDirs: mutable.Map[Int, Set[TopicPartition]] = mutable.Map.empty
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
 
+  def partitionReplicaAssignment(topicPartition: TopicPartition): Seq[Int] = {
+    partitionReplicaAssignmentUnderlying.getOrElse(topicPartition.topic, mutable.Map.empty)
+      .getOrElse(topicPartition.partition, Seq.empty)
+  }
+
+  private def clearTopicsState(): Unit = {
+    allTopics = Set.empty
+    partitionReplicaAssignmentUnderlying.clear()
+    partitionLeadershipInfo.clear()
+    partitionsBeingReassigned.clear()
+    replicasOnOfflineDirs.clear()
+  }
+
+  def updatePartitionReplicaAssignment(topicPartition: TopicPartition, newReplicas: Seq[Int]): Unit = {
+    partitionReplicaAssignmentUnderlying.getOrElseUpdate(topicPartition.topic, mutable.Map.empty)
+      .put(topicPartition.partition, newReplicas)
+  }
+
+  def partitionReplicaAssignmentForTopic(topic : String): Map[TopicPartition, Seq[Int]] = {
+    partitionReplicaAssignmentUnderlying.getOrElse(topic, Map.empty).map {
+      case (partition, replicas) => (new TopicPartition(topic, partition), replicas)
+    }.toMap
+  }
+
+  def allPartitions: Set[TopicPartition] = {
+    partitionReplicaAssignmentUnderlying.flatMap {
+      case (topic, topicReplicaAssignment) => topicReplicaAssignment.map {
+        case (partition, _) => new TopicPartition(topic, partition)
+      }
+    }.toSet
+  }
+
   // setter
   def liveBrokers_=(brokers: Set[Broker]) {
     liveBrokersUnderlying = brokers
@@ -53,8 +85,12 @@ class ControllerContext {
   def liveOrShuttingDownBrokers = liveBrokersUnderlying
 
   def partitionsOnBroker(brokerId: Int): Set[TopicPartition] = {
-    partitionReplicaAssignment.collect {
-      case (topicPartition, replicas) if replicas.contains(brokerId) => topicPartition
+    partitionReplicaAssignmentUnderlying.flatMap {
+      case (topic, topicReplicaAssignment) => topicReplicaAssignment.filter {
+        case (_, replicas) => replicas.contains(brokerId)
+      }.map {
+        case (partition, _) => new TopicPartition(topic, partition)
+      }
     }.toSet
   }
 
@@ -68,22 +104,26 @@ class ControllerContext {
 
   def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
     brokerIds.flatMap { brokerId =>
-      partitionReplicaAssignment.collect { case (topicPartition, replicas) if replicas.contains(brokerId) =>
-        PartitionAndReplica(topicPartition, brokerId)
+      partitionReplicaAssignmentUnderlying.flatMap {
+        case (topic, topicReplicaAssignment) => topicReplicaAssignment.collect {
+          case (partition, replicas)  if replicas.contains(brokerId) =>
+            PartitionAndReplica(new TopicPartition(topic, partition), brokerId)
+        }
       }
-    }.toSet
+    }
   }
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
-    partitionReplicaAssignment
-      .filter { case (topicPartition, _) => topicPartition.topic == topic }
-      .flatMap { case (topicPartition, replicas) =>
-        replicas.map(PartitionAndReplica(topicPartition, _))
-      }.toSet
+    partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).flatMap {
+      case (partition, replicas) => replicas.map(r => PartitionAndReplica(new TopicPartition(topic, partition), r))
+    }.toSet
   }
 
-  def partitionsForTopic(topic: String): collection.Set[TopicPartition] =
-    partitionReplicaAssignment.keySet.filter(topicPartition => topicPartition.topic == topic)
+  def partitionsForTopic(topic: String): collection.Set[TopicPartition] = {
+    partitionReplicaAssignmentUnderlying.getOrElse(topic, mutable.Map.empty).map {
+      case (partition, _) => new TopicPartition(topic, partition)
+    }.toSet
+  }
 
   def allLiveReplicas(): Set[PartitionAndReplica] = {
     replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
@@ -98,10 +138,24 @@ class ControllerContext {
     }
   }
 
-  def removeTopic(topic: String) = {
-    partitionLeadershipInfo = partitionLeadershipInfo.filter { case (topicPartition, _) => topicPartition.topic != topic }
-    partitionReplicaAssignment = partitionReplicaAssignment.filter { case (topicPartition, _) => topicPartition.topic != topic }
-    allTopics -= topic
+  def resetContext(): Unit = {
+    if (controllerChannelManager != null) {
+      controllerChannelManager.shutdown()
+      controllerChannelManager = null
+    }
+    shuttingDownBrokerIds.clear()
+    epoch = 0
+    epochZkVersion = 0
+    clearTopicsState()
+    liveBrokers = Set.empty
   }
 
+  def removeTopic(topic: String): Unit = {
+    allTopics -= topic
+    partitionReplicaAssignmentUnderlying.remove(topic)
+    partitionLeadershipInfo.foreach {
+      case (topicPartition, _) if topicPartition.topic == topic => partitionLeadershipInfo.remove(topicPartition)
+      case _ =>
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2cb3f7c..4778a7a 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -309,7 +309,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     replicaStateMachine.shutdown()
     zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
 
-    resetControllerContext()
+    controllerContext.resetContext()
 
     info("Resigned")
   }
@@ -569,28 +569,28 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         }
         val newReplicas = reassignedPartitionContext.newReplicas
         val topic = tp.topic
-        controllerContext.partitionReplicaAssignment.get(tp) match {
-          case Some(assignedReplicas) =>
-            if (assignedReplicas == newReplicas) {
-              info(s"Partition $tp to be reassigned is already assigned to replicas " +
-                s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
-              removePartitionFromReassignedPartitions(tp)
-            } else {
-              try {
-                info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
-                // first register ISR change listener
-                reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
-                // mark topic ineligible for deletion for the partitions being reassigned
-                topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
-                onPartitionReassignment(tp, reassignedPartitionContext)
-              } catch {
-                case e: Throwable =>
-                  error(s"Error completing reassignment of partition $tp", e)
-                  // remove the partition from the admin path to unblock the admin client
-                  removePartitionFromReassignedPartitions(tp)
-              }
+        val assignedReplicas = controllerContext.partitionReplicaAssignment(tp)
+        if (assignedReplicas.nonEmpty) {
+          if (assignedReplicas == newReplicas) {
+            info(s"Partition $tp to be reassigned is already assigned to replicas " +
+              s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.")
+            removePartitionFromReassignedPartitions(tp)
+          } else {
+            try {
+              info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}")
+              // first register ISR change listener
+              reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient)
+              // mark topic ineligible for deletion for the partitions being reassigned
+              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
+              onPartitionReassignment(tp, reassignedPartitionContext)
+            } catch {
+              case e: Throwable =>
+                error(s"Error completing reassignment of partition $tp", e)
+                // remove the partition from the admin path to unblock the admin client
+                removePartitionFromReassignedPartitions(tp)
             }
-          case None =>
+          }
+        } else {
             error(s"Ignoring request to reassign partition $tp that doesn't exist.")
             removePartitionFromReassignedPartitions(tp)
         }
@@ -643,8 +643,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.liveBrokers = zkClient.getAllBrokersInCluster.toSet
     controllerContext.allTopics = zkClient.getAllTopicsInCluster.toSet
     registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
-    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
-    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicPartition, LeaderIsrAndControllerEpoch]
+    zkClient.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet).foreach {
+      case (topicPartition, assignedReplicas) => controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
+    }
+    controllerContext.partitionLeadershipInfo.clear()
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // register broker modifications handlers
     registerBrokerModificationsHandler(controllerContext.liveBrokers.map(_.id))
@@ -662,10 +664,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     val partitionsUndergoingPreferredReplicaElection = zkClient.getPreferredReplicaElection
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
-      val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
-      val topicDeleted = replicasOpt.isEmpty
+      val replicas = controllerContext.partitionReplicaAssignment(partition)
+      val topicDeleted = replicas.isEmpty
       val successful =
-        if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
+        if (!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicas.head else false
       successful || topicDeleted
     }
     val pendingPreferredReplicaElectionsIgnoringTopicDeletion = partitionsUndergoingPreferredReplicaElection -- partitionsThatCompletedPreferredReplicaElection
@@ -678,21 +680,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     pendingPreferredReplicaElections
   }
 
-  private def resetControllerContext(): Unit = {
-    if (controllerContext.controllerChannelManager != null) {
-      controllerContext.controllerChannelManager.shutdown()
-      controllerContext.controllerChannelManager = null
-    }
-    controllerContext.shuttingDownBrokerIds.clear()
-    controllerContext.epoch = 0
-    controllerContext.epochZkVersion = 0
-    controllerContext.allTopics = Set.empty
-    controllerContext.partitionReplicaAssignment.clear()
-    controllerContext.partitionLeadershipInfo.clear()
-    controllerContext.partitionsBeingReassigned.clear()
-    controllerContext.liveBrokers = Set.empty
-  }
-
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
     val partitionsBeingReassigned = zkClient.getPartitionReassignment
@@ -706,9 +693,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
     val topicsToBeDeleted = zkClient.getTopicDeletions.toSet
-    val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
-      replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
-    }.keySet.map(_.topic)
+    val topicsWithOfflineReplicas = controllerContext.allTopics.filter { topic => {
+      val replicasForTopic = controllerContext.replicasForTopic(topic)
+      replicasForTopic.exists(r => !controllerContext.isReplicaOnline(r.replica, r.topicPartition))
+    }}
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
     val topicsIneligibleForDeletion = topicsWithOfflineReplicas | topicsForWhichPartitionReassignmentIsInProgress
     info(s"List of topics to be deleted: ${topicsToBeDeleted.mkString(",")}")
@@ -722,7 +710,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     controllerContext.controllerChannelManager.startup()
   }
 
-  private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+  private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq) {
     val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
     leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
       controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -742,7 +730,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
     // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
     val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicPartition)
-    controllerContext.partitionReplicaAssignment.put(topicPartition, reassignedReplicas)
+    controllerContext.updatePartitionReplicaAssignment(topicPartition, reassignedReplicas)
     if (!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
       info(s"Leader $currentLeader for partition $topicPartition being reassigned, " +
         s"is not in the new list of replicas ${reassignedReplicas.mkString(",")}. Re-electing leader")
@@ -778,14 +766,13 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def updateAssignedReplicasForPartition(partition: TopicPartition,
                                                  replicas: Seq[Int]) {
-    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic == partition.topic)
-    partitionsAndReplicasForThisTopic.put(partition, replicas)
-    val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+    controllerContext.updatePartitionReplicaAssignment(partition, replicas)
+    val setDataResponse = zkClient.setTopicAssignmentRaw(partition.topic, controllerContext.partitionReplicaAssignmentForTopic(partition.topic))
     setDataResponse.resultCode match {
       case Code.OK =>
         info(s"Updated assigned replicas for partition $partition being reassigned to ${replicas.mkString(",")}")
         // update the assigned replica list after a successful zookeeper write
-        controllerContext.partitionReplicaAssignment.put(partition, replicas)
+        controllerContext.updatePartitionReplicaAssignment(partition, replicas)
       case Code.NONODE => throw new IllegalStateException(s"Topic ${partition.topic} doesn't exist")
       case _ => throw new KafkaException(setDataResponse.resultException.get)
     }
@@ -971,9 +958,12 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   private def checkAndTriggerAutoLeaderRebalance(): Unit = {
     trace("Checking need to trigger auto leader balancing")
     val preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicPartition, Seq[Int]]] =
-      controllerContext.partitionReplicaAssignment.filterNot { case (tp, _) =>
-        topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
-      }.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
+      controllerContext.allPartitions.filterNot {
+        tp => topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+      }.map { tp =>
+        (tp, controllerContext.partitionReplicaAssignment(tp) )
+      }.toMap.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
+
     debug(s"Preferred replicas by broker $preferredReplicasForTopicsByBrokers")
 
     // for each broker, check if a preferred replica election needs to be triggered
@@ -1155,7 +1145,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       if (!isActive) {
         0
       } else {
-        controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+        controllerContext.allPartitions.count { topicPartition =>
+          val replicas = controllerContext.partitionReplicaAssignment(topicPartition)
           val preferredReplica = replicas.head
           val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
           leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) &&
@@ -1273,9 +1264,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
       registerPartitionModificationsHandlers(newTopics.toSeq)
       val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
-      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
-        !deletedTopics.contains(p._1.topic))
-      controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
+      deletedTopics.foreach(controllerContext.removeTopic)
+      addedPartitionReplicaAssignment.foreach {
+        case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas)
+      }
       info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
         s"[$addedPartitionReplicaAssignment]")
       if (addedPartitionReplicaAssignment.nonEmpty)
@@ -1315,8 +1307,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     override def process(): Unit = {
       if (!isActive) return
       val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
-      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
-        !controllerContext.partitionReplicaAssignment.contains(p._1))
+      val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+        controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
+      }
       if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
         if (partitionsToBeAdded.nonEmpty) {
           warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
@@ -1330,7 +1323,9 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       else {
         if (partitionsToBeAdded.nonEmpty) {
           info(s"New partitions to be added $partitionsToBeAdded")
-          controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
+          partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
+            controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
+          }
           onNewPartitionCreation(partitionsToBeAdded.keySet)
         }
       }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 2e27272..74bc59f 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -76,7 +76,7 @@ class PartitionStateMachine(config: KafkaConfig,
    * zookeeper
    */
   private def initializePartitionState() {
-    for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
+    for (topicPartition <- controllerContext.allPartitions) {
       // check if leader and isr path exists for partition. If not, then it is in NEW state
       controllerContext.partitionLeadershipInfo.get(topicPartition) match {
         case Some(currentLeaderIsrAndEpoch) =>
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 85af764..a2d04e6 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -80,7 +80,8 @@ class ReplicaStateMachine(config: KafkaConfig,
    * in zookeeper
    */
   private def initializeReplicaState() {
-    controllerContext.partitionReplicaAssignment.foreach { case (partition, replicas) =>
+    controllerContext.allPartitions.foreach { partition =>
+      val replicas = controllerContext.partitionReplicaAssignment(partition)
       replicas.foreach { replicaId =>
         val partitionAndReplica = PartitionAndReplica(partition, replicaId)
         if (controllerContext.isReplicaOnline(replicaId, partition))
@@ -181,7 +182,7 @@ class ReplicaStateMachine(config: KafkaConfig,
             case NewReplica =>
               val assignment = controllerContext.partitionReplicaAssignment(partition)
               if (!assignment.contains(replicaId)) {
-                controllerContext.partitionReplicaAssignment.put(partition, assignment :+ replicaId)
+                controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId)
               }
             case _ =>
               controllerContext.partitionLeadershipInfo.get(partition) match {
@@ -237,7 +238,7 @@ class ReplicaStateMachine(config: KafkaConfig,
       case NonExistentReplica =>
         validReplicas.foreach { replica =>
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition)
-          controllerContext.partitionReplicaAssignment.put(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
+          controllerContext.updatePartitionReplicaAssignment(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
           logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), NonExistentReplica)
           replicaState.remove(replica)
         }
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index b1d8394..6e14551 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -255,9 +255,8 @@ class TopicDeletionManager(controller: KafkaController,
     // send update metadata so that brokers stop serving data for topics to be deleted
     val partitions = topics.flatMap(controllerContext.partitionsForTopic)
     controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
-    val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
     topics.foreach { topic =>
-      onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
+      onPartitionDeletion(controllerContext.partitionsForTopic(topic))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 32e0d43..52f4599 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -80,7 +80,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testNewPartitionToOnlinePartitionTransition(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -98,7 +98,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -114,7 +114,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, NewPartition)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -144,7 +144,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testOnlinePartitionToOnlineTransition(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, OnlinePartition)
     val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -175,7 +175,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     val otherBrokerId = brokerId + 1
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0), TestUtils.createBroker(otherBrokerId, "host", 0))
     controllerContext.shuttingDownBrokerIds.add(brokerId)
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId, otherBrokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId, otherBrokerId))
     partitionState.put(partition, OnlinePartition)
     val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId, otherBrokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -226,7 +226,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testOfflinePartitionToOnlinePartitionTransition(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, OfflinePartition)
     val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -257,7 +257,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, OfflinePartition)
     val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
@@ -278,7 +278,7 @@ class PartitionStateMachineTest extends JUnitSuite {
   @Test
   def testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup(): Unit = {
     controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     partitionState.put(partition, OfflinePartition)
     val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 4d38aac..6a961a5 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -104,7 +104,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   @Test
   def testNewReplicaToOnlineReplicaTransition(): Unit = {
     replicaState.put(replica, NewReplica)
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
     assertEquals(OnlineReplica, replicaState(replica))
   }
@@ -150,7 +150,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   @Test
   def testOnlineReplicaToOnlineReplicaTransition(): Unit = {
     replicaState.put(replica, OnlineReplica)
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -168,7 +168,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     val otherBrokerId = brokerId + 1
     val replicaIds = List(brokerId, otherBrokerId)
     replicaState.put(replica, OnlineReplica)
-    controllerContext.partitionReplicaAssignment.put(partition, replicaIds)
+    controllerContext.updatePartitionReplicaAssignment(partition, replicaIds)
     val leaderAndIsr = LeaderAndIsr(brokerId, replicaIds)
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
@@ -225,7 +225,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   @Test
   def testOfflineReplicaToOnlineReplicaTransition(): Unit = {
     replicaState.put(replica, OfflineReplica)
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
@@ -299,7 +299,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   @Test
   def testReplicaDeletionSuccessfulToNonexistentReplicaTransition(): Unit = {
     replicaState.put(replica, ReplicaDeletionSuccessful)
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     replicaStateMachine.handleStateChanges(replicas, NonExistentReplica)
     assertEquals(Seq.empty, controllerContext.partitionReplicaAssignment(partition))
     assertEquals(None, replicaState.get(replica))
@@ -343,7 +343,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
   @Test
   def testReplicaDeletionIneligibleToOnlineReplicaTransition(): Unit = {
     replicaState.put(replica, ReplicaDeletionIneligible)
-    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    controllerContext.updatePartitionReplicaAssignment(partition, Seq(brokerId))
     val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())

-- 
To stop receiving notification emails like this one, please contact
junrao@apache.org.