You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/14 01:58:58 UTC

[kafka] branch trunk updated: KAFKA-8294; Batch StopReplica requests when possible and improve test coverage (#6642)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4007a6  KAFKA-8294; Batch StopReplica requests when possible and improve test coverage (#6642)
e4007a6 is described below

commit e4007a6408236d9b52b355fcf3c3a80fe5f570ff
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon May 13 18:58:42 2019 -0700

    KAFKA-8294; Batch StopReplica requests when possible and improve test coverage (#6642)
    
    The main problem we are trying to solve here is the batching of StopReplica requests and the lack of test coverage for `ControllerChannelManager`. Addressing the first problem was straightforward, but the second problem required quite a bit of work because of the dependence on `KafkaController` for all of the events. It seemed to make sense to separate the events from the processing of events so that we could remove this dependence and improve testability. With the refactoring, I was  [...]
    
    While refactoring this logic, I found that the event queue time metric was not being correctly computed. The problem is that many of the controller events were singleton objects which inherited the `enqueueTimeMs` field from the `ControllerEvent` trait. This would never get updated, so queue time would be skewed.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 .../kafka/common/requests/StopReplicaRequest.java  |   12 +-
 .../controller/ControllerChannelManager.scala      |  276 +++--
 .../scala/kafka/controller/ControllerContext.scala |    4 +
 .../kafka/controller/ControllerEventManager.scala  |   84 +-
 .../scala/kafka/controller/KafkaController.scala   | 1129 ++++++++++----------
 .../kafka/controller/TopicDeletionManager.scala    |    7 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |    2 +-
 .../controller/ControllerChannelManagerTest.scala  |  708 ++++++++++++
 .../controller/ControllerEventManagerTest.scala    |   77 +-
 .../kafka/controller/ControllerFailoverTest.scala  |   19 +-
 .../controller/ControllerIntegrationTest.scala     |   23 +-
 .../kafka/controller/ControllerTestUtils.scala     |   35 -
 .../controller/TopicDeletionManagerTest.scala      |   24 +
 .../kafka/server/BrokerEpochIntegrationTest.scala  |   29 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |    3 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |    2 +-
 16 files changed, 1644 insertions(+), 790 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 7c03c79..bad9573 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -27,11 +27,11 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
@@ -71,10 +71,10 @@ public class StopReplicaRequest extends AbstractControlRequest {
 
     public static class Builder extends AbstractControlRequest.Builder<StopReplicaRequest> {
         private final boolean deletePartitions;
-        private final Set<TopicPartition> partitions;
+        private final Collection<TopicPartition> partitions;
 
         public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, boolean deletePartitions,
-                       Set<TopicPartition> partitions) {
+                       Collection<TopicPartition> partitions) {
             super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch);
             this.deletePartitions = deletePartitions;
             this.partitions = partitions;
@@ -101,10 +101,10 @@ public class StopReplicaRequest extends AbstractControlRequest {
     }
 
     private final boolean deletePartitions;
-    private final Set<TopicPartition> partitions;
+    private final Collection<TopicPartition> partitions;
 
     private StopReplicaRequest(int controllerId, int controllerEpoch, long brokerEpoch, boolean deletePartitions,
-                               Set<TopicPartition> partitions, short version) {
+                               Collection<TopicPartition> partitions, short version) {
         super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch);
         this.deletePartitions = deletePartitions;
         this.partitions = partitions;
@@ -158,7 +158,7 @@ public class StopReplicaRequest extends AbstractControlRequest {
         return deletePartitions;
     }
 
-    public Set<TopicPartition> partitions() {
+    public Collection<TopicPartition> partitions() {
         return partitions;
     }
 
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ca6c00a..92aa421 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{HashMap, ListBuffer}
 import scala.collection.{Set, mutable}
 
 object ControllerChannelManager {
@@ -45,9 +45,14 @@ object ControllerChannelManager {
   val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs"
 }
 
-class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
-                               stateChangeLogger: StateChangeLogger, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class ControllerChannelManager(controllerContext: ControllerContext,
+                               config: KafkaConfig,
+                               time: Time,
+                               metrics: Metrics,
+                               stateChangeLogger: StateChangeLogger,
+                               threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   import ControllerChannelManager._
+
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
   private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "]: "
@@ -61,9 +66,9 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
     }
   )
 
-  controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker)
-
   def startup() = {
+    controllerContext.liveOrShuttingDownBrokers.foreach(addNewBroker)
+
     brokerLock synchronized {
       brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
     }
@@ -71,17 +76,17 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
 
   def shutdown() = {
     brokerLock synchronized {
-      brokerStateInfo.values.foreach(removeExistingBroker)
+      brokerStateInfo.values.toList.foreach(removeExistingBroker)
     }
   }
 
-  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
+  def sendRequest(brokerId: Int, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
                   callback: AbstractResponse => Unit = null) {
     brokerLock synchronized {
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
         case Some(stateInfo) =>
-          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback, time.milliseconds()))
+          stateInfo.messageQueue.put(QueueItem(request.apiKey, request, callback, time.milliseconds()))
         case None =>
           warn(s"Not sending request $request to broker $brokerId, since it is offline.")
       }
@@ -303,14 +308,42 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends  Logging {
-  val controllerContext = controller.controllerContext
-  val controllerId: Int = controller.config.brokerId
+class ControllerBrokerRequestBatch(config: KafkaConfig,
+                                   controllerChannelManager: ControllerChannelManager,
+                                   controllerEventManager: ControllerEventManager,
+                                   controllerContext: ControllerContext,
+                                   stateChangeLogger: StateChangeLogger)
+  extends AbstractControllerBrokerRequestBatch(config, controllerContext, stateChangeLogger) {
+
+  def sendEvent(event: ControllerEvent): Unit = {
+    controllerEventManager.put(event)
+  }
+
+  def sendRequest(brokerId: Int,
+                  request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
+                  callback: AbstractResponse => Unit = null): Unit = {
+    controllerChannelManager.sendRequest(brokerId, request, callback)
+  }
+
+}
+
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean)
+
+abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
+                                                    controllerContext: ControllerContext,
+                                                    stateChangeLogger: StateChangeLogger) extends  Logging {
+  val controllerId: Int = config.brokerId
   val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, LeaderAndIsrRequest.PartitionState]]
-  val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
+  val stopReplicaRequestMap = mutable.Map.empty[Int, ListBuffer[StopReplicaRequestInfo]]
   val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
   val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, UpdateMetadataRequest.PartitionState]
 
+  def sendEvent(event: ControllerEvent): Unit
+
+  def sendRequest(brokerId: Int,
+                  request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
+                  callback: AbstractResponse => Unit = null): Unit
+
   def newBatch() {
     // raise error if the previous batch is not empty
     if (leaderAndIsrRequestMap.nonEmpty)
@@ -332,9 +365,11 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
     updateMetadataRequestPartitionInfoMap.clear()
   }
 
-  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition,
+  def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int],
+                                       topicPartition: TopicPartition,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int], isNew: Boolean) {
+                                       replicas: Seq[Int],
+                                       isNew: Boolean): Unit = {
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
       val result = leaderAndIsrRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty)
@@ -355,25 +390,14 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
                                       topicPartition: TopicPartition,
                                       deletePartition: Boolean): Unit = {
     brokerIds.filter(_ >= 0).foreach { brokerId =>
-      def topicDeletionCallback(stopReplicaResponse: AbstractResponse): Unit = {
-        controller.eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponse, brokerId))
-      }
-
-      val responseReceivedCallback = if (deletePartition && controllerContext.isTopicDeletionInProgress(topicPartition.topic))
-        topicDeletionCallback _
-      else
-        null
-
-      stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
-      val v = stopReplicaRequestMap(brokerId)
-      stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topicPartition, brokerId),
-        deletePartition, responseReceivedCallback)
+      val stopReplicaInfos = stopReplicaRequestMap.getOrElseUpdate(brokerId, ListBuffer.empty[StopReplicaRequestInfo])
+      stopReplicaInfos.append(StopReplicaRequestInfo(PartitionAndReplica(topicPartition, brokerId), deletePartition))
     }
   }
 
   /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
-                                         partitions: collection.Set[TopicPartition]) {
+                                         partitions: collection.Set[TopicPartition]): Unit = {
 
     def updateMetadataRequestPartitionInfo(partition: TopicPartition, beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
@@ -407,106 +431,124 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
       beingDeleted = controllerContext.topicsToBeDeleted.contains(partition.topic)))
   }
 
-  def sendRequestsToBrokers(controllerEpoch: Int) {
-    try {
-      val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
+  private def sendLeaderAndIsrRequest(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
+    val leaderAndIsrRequestVersion: Short =
+      if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
+      else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
+      else 0
+
+    leaderAndIsrRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach {
+      case (broker, leaderAndIsrPartitionStates) =>
+        leaderAndIsrPartitionStates.foreach {
+          case (topicPartition, state) =>
+            val typeOfRequest =
+              if (broker == state.basePartitionState.leader) "become-leader"
+              else "become-follower"
+            stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
+        }
+        val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
+        val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
+          _.node(config.interBrokerListenerName)
+        }
+        val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+        val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, controllerEpoch,
+          brokerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
+        sendRequest(broker, leaderAndIsrRequestBuilder, (r: AbstractResponse) => sendEvent(LeaderAndIsrResponseReceived(r, broker)))
 
-      val leaderAndIsrRequestVersion: Short =
-        if (controller.config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
-        else if (controller.config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
-        else 0
-
-      leaderAndIsrRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach {
-        case (broker, leaderAndIsrPartitionStates) =>
-          leaderAndIsrPartitionStates.foreach {
-            case (topicPartition, state) =>
-              val typeOfRequest =
-                if (broker == state.basePartitionState.leader) "become-leader"
-                else "become-follower"
-              stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
-          }
-          val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
-          val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
-            _.node(controller.config.interBrokerListenerName)
-          }
-          val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
-          val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, controllerEpoch,
-            brokerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
-          controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder,
-            (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))
+    }
+    leaderAndIsrRequestMap.clear()
+  }
 
-      }
-      leaderAndIsrRequestMap.clear()
+  private def sendUpdateMetadataRequests(controllerEpoch: Int, stateChangeLog: StateChangeLogger): Unit = {
+    updateMetadataRequestPartitionInfoMap.foreach { case (tp, partitionState) =>
+      stateChangeLog.trace(s"Sending UpdateMetadata request $partitionState to brokers $updateMetadataRequestBrokerSet " +
+        s"for partition $tp")
+    }
 
-      updateMetadataRequestPartitionInfoMap.foreach { case (tp, partitionState) =>
-        stateChangeLog.trace(s"Sending UpdateMetadata request $partitionState to brokers $updateMetadataRequestBrokerSet " +
-          s"for partition $tp")
+    val partitionStates = Map.empty ++ updateMetadataRequestPartitionInfoMap
+    val updateMetadataRequestVersion: Short =
+      if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
+      else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
+      else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
+      else if (config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
+      else if (config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
+      else 0
+
+    val liveBrokers = if (updateMetadataRequestVersion == 0) {
+      // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
+      controllerContext.liveOrShuttingDownBrokers.map { broker =>
+        val securityProtocol = SecurityProtocol.PLAINTEXT
+        val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+        val node = broker.node(listenerName)
+        val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
+        new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
       }
-
-      val partitionStates = Map.empty ++ updateMetadataRequestPartitionInfoMap
-      val updateMetadataRequestVersion: Short =
-        if (controller.config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
-        else if (controller.config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
-        else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
-        else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
-        else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
-        else 0
-
-      val liveBrokers = if (updateMetadataRequestVersion == 0) {
-        // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
-        controllerContext.liveOrShuttingDownBrokers.map { broker =>
-          val securityProtocol = SecurityProtocol.PLAINTEXT
-          val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-          val node = broker.node(listenerName)
-          val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
-          new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
-        }
-      } else {
-        controllerContext.liveOrShuttingDownBrokers.map { broker =>
-          val endPoints = broker.endPoints.map { endPoint =>
-            new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
-          }
-          new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
+    } else {
+      controllerContext.liveOrShuttingDownBrokers.map { broker =>
+        val endPoints = broker.endPoints.map { endPoint =>
+          new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
         }
+        new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
       }
+    }
 
-      updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach { broker =>
-        val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
-        val updateMetadataRequest = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch,
-          brokerEpoch, partitionStates.asJava, liveBrokers.asJava)
-        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
-      }
-      updateMetadataRequestBrokerSet.clear()
-      updateMetadataRequestPartitionInfoMap.clear()
+    updateMetadataRequestBrokerSet.intersect(controllerContext.liveOrShuttingDownBrokerIds).foreach { broker =>
+      val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+      val updateMetadataRequest = new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch,
+        brokerEpoch, partitionStates.asJava, liveBrokers.asJava)
+      sendRequest(broker, updateMetadataRequest)
+    }
+    updateMetadataRequestBrokerSet.clear()
+    updateMetadataRequestPartitionInfoMap.clear()
+  }
 
-      val stopReplicaRequestVersion: Short =
-        if (controller.config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
-        else 0
+  private def sendStopReplicaRequests(controllerEpoch: Int): Unit = {
+    val stopReplicaRequestVersion: Short =
+      if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
+      else 0
 
-      stopReplicaRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach { case (broker, replicaInfoList) =>
-        val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
-        val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
-        debug(s"The stop replica request (delete = true) sent to broker $broker is ${stopReplicaWithDelete.mkString(",")}")
-        debug(s"The stop replica request (delete = false) sent to broker $broker is ${stopReplicaWithoutDelete.mkString(",")}")
+    def stopReplicaPartitionDeleteResponseCallback(brokerId: Int)(response: AbstractResponse): Unit = {
+      val stopReplicaResponse = response.asInstanceOf[StopReplicaResponse]
+      val partitionErrorsForDeletingTopics = stopReplicaResponse.responses.asScala.filterKeys { partition =>
+        controllerContext.isTopicDeletionInProgress(partition.topic)
+      }.toMap
 
-        val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => !r.deletePartition && r.callback == null)
-        val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+      if (partitionErrorsForDeletingTopics.nonEmpty)
+        sendEvent(TopicDeletionStopReplicaResponseReceived(brokerId, stopReplicaResponse.error, partitionErrorsForDeletingTopics))
+    }
 
-        // Send one StopReplicaRequest for all partitions that require neither delete nor callback. This potentially
-        // changes the order in which the requests are sent for the same partitions, but that's OK.
-        val stopReplicaRequest = new StopReplicaRequest.Builder(stopReplicaRequestVersion, controllerId, controllerEpoch,
-          brokerEpoch, false,
-          replicasToGroup.map(_.replica.topicPartition).toSet.asJava)
-        controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest)
-
-        replicasToNotGroup.foreach { r =>
-          val stopReplicaRequest = new StopReplicaRequest.Builder(stopReplicaRequestVersion,
-            controllerId, controllerEpoch, brokerEpoch, r.deletePartition,
-            Set(r.replica.topicPartition).asJava)
-          controller.sendRequest(broker, ApiKeys.STOP_REPLICA, stopReplicaRequest, r.callback)
-        }
+    def createStopReplicaRequest(brokerEpoch: Long, requests: Seq[StopReplicaRequestInfo], deletePartitions: Boolean): StopReplicaRequest.Builder = {
+      val partitions = requests.map(_.replica.topicPartition).asJava
+      new StopReplicaRequest.Builder(stopReplicaRequestVersion, controllerId, controllerEpoch,
+        brokerEpoch, deletePartitions, partitions)
+    }
+
+    stopReplicaRequestMap.filterKeys(controllerContext.liveOrShuttingDownBrokerIds.contains).foreach { case (brokerId, replicaInfoList) =>
+      val (stopReplicaWithDelete, stopReplicaWithoutDelete) = replicaInfoList.partition(r => r.deletePartition)
+      val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(brokerId)
+
+      if (stopReplicaWithDelete.nonEmpty) {
+        debug(s"The stop replica request (delete = true) sent to broker $brokerId is ${stopReplicaWithDelete.mkString(",")}")
+        val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithDelete, deletePartitions = true)
+        val callback = stopReplicaPartitionDeleteResponseCallback(brokerId) _
+        sendRequest(brokerId, stopReplicaRequest, callback)
       }
-      stopReplicaRequestMap.clear()
+
+      if (stopReplicaWithoutDelete.nonEmpty) {
+        debug(s"The stop replica request (delete = false) sent to broker $brokerId is ${stopReplicaWithoutDelete.mkString(",")}")
+        val stopReplicaRequest = createStopReplicaRequest(brokerEpoch, stopReplicaWithoutDelete, deletePartitions = false)
+        sendRequest(brokerId, stopReplicaRequest)
+      }
+    }
+    stopReplicaRequestMap.clear()
+  }
+
+  def sendRequestsToBrokers(controllerEpoch: Int): Unit = {
+    try {
+      val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
+      sendLeaderAndIsrRequest(controllerEpoch, stateChangeLog)
+      sendUpdateMetadataRequests(controllerEpoch, stateChangeLog)
+      sendStopReplicaRequests(controllerEpoch)
     } catch {
       case e: Throwable =>
         if (leaderAndIsrRequestMap.nonEmpty) {
@@ -533,5 +575,3 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      queueSizeGauge: Gauge[Int],
                                      requestRateAndTimeMetrics: Timer)
 
-case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
-
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 3069024..573a981 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -202,6 +202,10 @@ class ControllerContext {
     }
   }
 
+  def queueTopicDeletion(topics: Set[String]): Unit = {
+    topicsToBeDeleted ++= topics
+  }
+
   def beginTopicDeletion(topics: Set[String]): Unit = {
     topicsWithDeletionStarted ++= topics
   }
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index a456ce3..f579c2b 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -17,14 +17,14 @@
 
 package kafka.controller
 
-import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue}
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
-import org.apache.kafka.common.errors.ControllerMovedException
 import org.apache.kafka.common.utils.Time
 
 import scala.collection._
@@ -33,16 +33,49 @@ import scala.collection.JavaConverters._
 object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
-class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
-                             eventProcessedListener: ControllerEvent => Unit,
-                             controllerMovedListener: () => Unit) extends KafkaMetricsGroup {
+
+trait ControllerEventProcessor {
+  def process(event: ControllerEvent): Unit
+  def preempt(event: ControllerEvent): Unit
+}
+
+class QueuedEvent(val event: ControllerEvent,
+                  val enqueueTimeMs: Long) {
+  val processingStarted = new CountDownLatch(1)
+  val spent = new AtomicBoolean(false)
+
+  def process(processor: ControllerEventProcessor): Unit = {
+    if (spent.getAndSet(true))
+      return
+    processingStarted.countDown()
+    processor.process(event)
+  }
+
+  def preempt(processor: ControllerEventProcessor): Unit = {
+    if (spent.getAndSet(true))
+      return
+    processor.preempt(event)
+  }
+
+  def awaitProcessing(): Unit = {
+    processingStarted.await()
+  }
+
+  override def toString: String = {
+    s"QueuedEvent(event=$event, enqueueTimeMs=$enqueueTimeMs)"
+  }
+}
+
+class ControllerEventManager(controllerId: Int,
+                             processor: ControllerEventProcessor,
+                             time: Time,
+                             rateAndTimeMetrics: Map[ControllerState, KafkaTimer]) extends KafkaMetricsGroup {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
-  private val queue = new LinkedBlockingQueue[ControllerEvent]
+  private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
   private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
-  private val time = Time.SYSTEM
 
   private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
 
@@ -55,55 +88,48 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
     }
   )
 
-
   def state: ControllerState = _state
 
   def start(): Unit = thread.start()
 
   def close(): Unit = {
     thread.initiateShutdown()
-    clearAndPut(KafkaController.ShutdownEventThread)
+    clearAndPut(ShutdownEventThread)
     thread.awaitShutdown()
   }
 
-  def put(event: ControllerEvent): Unit = inLock(putLock) {
-    queue.put(event)
+  def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
+    val queuedEvent = new QueuedEvent(event, time.milliseconds())
+    queue.put(queuedEvent)
+    queuedEvent
   }
 
-  def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
-    queue.asScala.foreach(evt =>
-      if (evt.isInstanceOf[PreemptableControllerEvent])
-        evt.asInstanceOf[PreemptableControllerEvent].preempt()
-    )
+  def clearAndPut(event: ControllerEvent): QueuedEvent = inLock(putLock) {
+    queue.asScala.foreach(_.preempt(processor))
     queue.clear()
     put(event)
   }
 
+  def isEmpty: Boolean = queue.isEmpty
+
   class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
     logIdent = s"[ControllerEventThread controllerId=$controllerId] "
 
     override def doWork(): Unit = {
-      queue.take() match {
-        case KafkaController.ShutdownEventThread => // The shutting down of the thread has been initiated at this point. Ignore this event.
+      val dequeued = queue.take()
+      dequeued.event match {
+        case ShutdownEventThread => // The shutting down of the thread has been initiated at this point. Ignore this event.
         case controllerEvent =>
           _state = controllerEvent.state
 
-          eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs)
+          eventQueueTimeHist.update(time.milliseconds() - dequeued.enqueueTimeMs)
 
           try {
             rateAndTimeMetrics(state).time {
-              controllerEvent.process()
+              dequeued.process(processor)
             }
           } catch {
-            case e: ControllerMovedException =>
-              info(s"Controller moved to another broker when processing $controllerEvent.", e)
-              controllerMovedListener()
-            case e: Throwable => error(s"Error processing event $controllerEvent", e)
-          }
-
-          try eventProcessedListener(controllerEvent)
-          catch {
-            case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
+            case e: Throwable => error(s"Uncaught error processing event $controllerEvent", e)
           }
 
           _state = ControllerState.Idle
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 183ffaf..0234c3b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,13 +16,13 @@
  */
 package kafka.controller
 
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.common._
+import kafka.controller.KafkaController.ElectPreferredLeadersCallback
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
 import kafka.utils._
@@ -32,39 +32,37 @@ import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChange
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, ApiError, LeaderAndIsrResponse, StopReplicaResponse}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, ApiError, LeaderAndIsrResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.KeeperException
 import org.apache.zookeeper.KeeperException.Code
-import scala.collection.JavaConverters._
 
+import scala.collection.JavaConverters._
 import scala.collection._
 import scala.util.{Failure, Try}
 
+sealed trait ElectionType
+object AutoTriggered extends ElectionType
+object ZkTriggered extends ElectionType
+object AdminClientTriggered extends ElectionType
+
 object KafkaController extends Logging {
   val InitialControllerEpoch = 0
   val InitialControllerEpochZkVersion = 0
 
-  /**
-   * ControllerEventThread will shutdown once it sees this event
-   */
-  private[controller] case object ShutdownEventThread extends ControllerEvent {
-    def state = ControllerState.ControllerShutdown
-    override def process(): Unit = ()
-  }
-
-  // Used only by test
-  private[controller] case class AwaitOnLatch(latch: CountDownLatch) extends ControllerEvent {
-    override def state: ControllerState = ControllerState.ControllerChange
-    override def process(): Unit = latch.await()
-  }
-
+  type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError]) => Unit
 }
 
-class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Time, metrics: Metrics,
-                      initialBrokerInfo: BrokerInfo, initialBrokerEpoch: Long, tokenManager: DelegationTokenManager,
-                      threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig,
+                      zkClient: KafkaZkClient,
+                      time: Time,
+                      metrics: Metrics,
+                      initialBrokerInfo: BrokerInfo,
+                      initialBrokerEpoch: Long,
+                      tokenManager: DelegationTokenManager,
+                      threadNamePrefix: Option[String] = None)
+  extends ControllerEventProcessor with Logging with KafkaMetricsGroup {
 
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
@@ -73,34 +71,36 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
   val controllerContext = new ControllerContext
-  var controllerChannelManager: ControllerChannelManager = _
+  var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
+    stateChangeLogger, threadNamePrefix)
 
   // have a separate scheduler for the controller to be able to start and stop independently of the kafka server
   // visible for testing
   private[controller] val kafkaScheduler = new KafkaScheduler(1)
 
   // visible for testing
-  private[controller] val eventManager = new ControllerEventManager(config.brokerId,
-    controllerContext.stats.rateAndTimeMetrics, _ => updateMetrics(), () => maybeResign())
+  private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,
+    controllerContext.stats.rateAndTimeMetrics)
 
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(config, controllerChannelManager,
+    eventManager, controllerContext, stateChangeLogger)
   val replicaStateMachine: ReplicaStateMachine = new ZkReplicaStateMachine(config, stateChangeLogger, controllerContext, zkClient,
-    new ControllerBrokerRequestBatch(this, stateChangeLogger))
+    new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
   val partitionStateMachine: PartitionStateMachine = new ZkPartitionStateMachine(config, stateChangeLogger, controllerContext, zkClient,
-    new ControllerBrokerRequestBatch(this, stateChangeLogger))
+    new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))
   val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
     partitionStateMachine, new ControllerDeletionClient(this, zkClient))
 
-  private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
-  private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
+  private val controllerChangeHandler = new ControllerChangeHandler(eventManager)
+  private val brokerChangeHandler = new BrokerChangeHandler(eventManager)
   private val brokerModificationsHandlers: mutable.Map[Int, BrokerModificationsHandler] = mutable.Map.empty
-  private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
-  private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
+  private val topicChangeHandler = new TopicChangeHandler(eventManager)
+  private val topicDeletionHandler = new TopicDeletionHandler(eventManager)
   private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
-  private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
-  private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
-  private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
-  private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
+  private val partitionReassignmentHandler = new PartitionReassignmentHandler(eventManager)
+  private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(eventManager)
+  private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(eventManager)
+  private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(eventManager)
 
   @volatile private var activeControllerId = -1
   @volatile private var offlinePartitionCount = 0
@@ -174,12 +174,11 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
         eventManager.put(RegisterBrokerAndReelect)
       }
       override def beforeInitializingSession(): Unit = {
-        val expireEvent = new Expire
-        eventManager.clearAndPut(expireEvent)
+        val queuedEvent = eventManager.clearAndPut(Expire)
 
         // Block initialization of the new session until the expiration event is being handled,
         // which ensures that all pending events have been processed before creating the new session
-        expireEvent.waitUntilProcessingStarted()
+        queuedEvent.awaitProcessing()
       }
     })
     eventManager.put(Startup)
@@ -330,11 +329,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     replicaStateMachine.shutdown()
     zkClient.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
 
-
-    if (controllerChannelManager != null) {
-      controllerChannelManager.shutdown()
-      controllerChannelManager = null
-    }
+    controllerChannelManager.shutdown()
     controllerContext.resetContext()
 
     info("Resigned")
@@ -406,7 +401,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
   private def registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit = {
     debug(s"Register BrokerModifications handler for $brokerIds")
     brokerIds.foreach { brokerId =>
-      val brokerModificationsHandler = new BrokerModificationsHandler(this, eventManager, brokerId)
+      val brokerModificationsHandler = new BrokerModificationsHandler(eventManager, brokerId)
       zkClient.registerZNodeChangeHandlerAndCheckExistence(brokerModificationsHandler)
       brokerModificationsHandlers.put(brokerId, brokerModificationsHandler)
     }
@@ -635,10 +630,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment)
   }
 
-  sealed trait ElectionType
-  object AutoTriggered extends ElectionType
-  object ZkTriggered extends ElectionType
-  object AdminClientTriggered extends ElectionType
 
   /**
     * Attempt to elect the preferred replica as leader for each of the given partitions.
@@ -687,7 +678,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
-    startChannelManager()
+    controllerChannelManager.startup()
     initializePartitionReassignment()
     info(s"Currently active brokers in the cluster: ${controllerContext.liveBrokerIds}")
     info(s"Currently shutting brokers in the cluster: ${controllerContext.shuttingDownBrokerIds}")
@@ -720,8 +711,8 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     info(s"Partitions being reassigned: $partitionsBeingReassigned")
 
     controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned.iterator.map { case (tp, newReplicas) =>
-      val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, tp)
-      tp -> new ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
+      val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)
+      tp -> ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)
     }
   }
 
@@ -738,12 +729,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     (topicsToBeDeleted, topicsIneligibleForDeletion)
   }
 
-  private def startChannelManager() {
-    controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
-      stateChangeLogger, threadNamePrefix)
-    controllerChannelManager.startup()
-  }
-
   private def updateLeaderAndIsrCache(partitions: Seq[TopicPartition] = controllerContext.allPartitions.toSeq) {
     val leaderIsrAndControllerEpochs = zkClient.getTopicPartitionStates(partitions)
     leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
@@ -846,7 +831,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
   private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
     topics.foreach { topic =>
-      val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
+      val partitionModificationsHandler = new PartitionModificationsHandler(eventManager, topic)
       partitionModificationsHandlers.put(topic, partitionModificationsHandler)
     }
     partitionModificationsHandlers.values.foreach(zkClient.registerZNodeChangeHandler)
@@ -916,11 +901,6 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  private[controller] def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
-                                      callback: AbstractResponse => Unit = null) = {
-    controllerChannelManager.sendRequest(brokerId, apiKey, request, callback)
-  }
-
   /**
    * Send the leader information for selected partitions to selected brokers so that they can correctly respond to
    * metadata requests
@@ -1016,177 +996,148 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
-
-    def state = ControllerState.AutoLeaderBalance
-
-    override def process(): Unit = {
-      if (!isActive) return
-      try {
-        checkAndTriggerAutoLeaderRebalance()
-      } finally {
-        scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
-      }
+  private def processAutoPreferredReplicaLeaderElection(): Unit = {
+    if (!isActive) return
+    try {
+      info("Processing automatic preferred replica leader election")
+      checkAndTriggerAutoLeaderRebalance()
+    } finally {
+      scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
     }
   }
 
-  case object UncleanLeaderElectionEnable extends ControllerEvent {
-
-    def state = ControllerState.UncleanLeaderElectionEnable
-
-    override def process(): Unit = {
-      if (!isActive) return
-      partitionStateMachine.triggerOnlinePartitionStateChange()
-    }
+  private def processUncleanLeaderElectionEnable(): Unit = {
+    if (!isActive) return
+    info("Unclean leader election has been enabled by default")
+    partitionStateMachine.triggerOnlinePartitionStateChange()
   }
 
-  case class TopicUncleanLeaderElectionEnable(topic: String) extends ControllerEvent {
-
-    def state = ControllerState.TopicUncleanLeaderElectionEnable
-
-    override def process(): Unit = {
-      if (!isActive) return
-      partitionStateMachine.triggerOnlinePartitionStateChange(topic)
-    }
+  private def processTopicUncleanLeaderElectionEnable(topic: String): Unit = {
+    if (!isActive) return
+    info(s"Unclean leader election has been enabled for topic $topic")
+    partitionStateMachine.triggerOnlinePartitionStateChange(topic)
   }
 
-  case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends PreemptableControllerEvent {
-
-    def state = ControllerState.ControlledShutdown
-
-    override def handlePreempt(): Unit = {
-      controlledShutdownCallback(Failure(new ControllerMovedException("Controller moved to another broker")))
-    }
-
-    override def handleProcess(): Unit = {
-      val controlledShutdownResult = Try { doControlledShutdown(id) }
-      controlledShutdownCallback(controlledShutdownResult)
-    }
+  private def preemptControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
+    controlledShutdownCallback(Failure(new ControllerMovedException("Controller moved to another broker")))
+  }
 
-    private def doControlledShutdown(id: Int): Set[TopicPartition] = {
-      if (!isActive) {
-        throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
-      }
+  private def processControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit = {
+    val controlledShutdownResult = Try { doControlledShutdown(id, brokerEpoch) }
+    controlledShutdownCallback(controlledShutdownResult)
+  }
 
-      // broker epoch in the request is unknown if the controller hasn't been upgraded to use KIP-380
-      // so we will keep the previous behavior and don't reject the request
-      if (brokerEpoch != AbstractControlRequest.UNKNOWN_BROKER_EPOCH) {
-        val cachedBrokerEpoch = controllerContext.liveBrokerIdAndEpochs(id)
-        if (brokerEpoch < cachedBrokerEpoch) {
-          val stateBrokerEpochErrorMessage = "Received controlled shutdown request from an old broker epoch " +
-            s"$brokerEpoch for broker $id. Current broker epoch is $cachedBrokerEpoch."
-          info(stateBrokerEpochErrorMessage)
-          throw new StaleBrokerEpochException(stateBrokerEpochErrorMessage)
-        }
+  private def doControlledShutdown(id: Int, brokerEpoch: Long): Set[TopicPartition] = {
+    if (!isActive) {
+      throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+    }
+
+    // broker epoch in the request is unknown if the controller hasn't been upgraded to use KIP-380
+    // so we will keep the previous behavior and don't reject the request
+    if (brokerEpoch != AbstractControlRequest.UNKNOWN_BROKER_EPOCH) {
+      val cachedBrokerEpoch = controllerContext.liveBrokerIdAndEpochs(id)
+      if (brokerEpoch < cachedBrokerEpoch) {
+        val stateBrokerEpochErrorMessage = "Received controlled shutdown request from an old broker epoch " +
+          s"$brokerEpoch for broker $id. Current broker epoch is $cachedBrokerEpoch."
+        info(stateBrokerEpochErrorMessage)
+        throw new StaleBrokerEpochException(stateBrokerEpochErrorMessage)
       }
+    }
 
-      info(s"Shutting down broker $id")
+    info(s"Shutting down broker $id")
 
-      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
-        throw new BrokerNotAvailableException(s"Broker id $id does not exist.")
+    if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+      throw new BrokerNotAvailableException(s"Broker id $id does not exist.")
 
-      controllerContext.shuttingDownBrokerIds.add(id)
-      debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
-      debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
+    controllerContext.shuttingDownBrokerIds.add(id)
+    debug(s"All shutting down brokers: ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
+    debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
 
-      val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
-        controllerContext.partitionReplicaAssignment(partition).size > 1 &&
-          controllerContext.partitionLeadershipInfo.contains(partition) &&
-          !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)
-      }
-      val (partitionsLedByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
-        controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
-      }
-      partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Some(ControlledShutdownPartitionLeaderElectionStrategy))
-      try {
-        brokerRequestBatch.newBatch()
-        partitionsFollowedByBroker.foreach { partition =>
-          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false)
-        }
-        brokerRequestBatch.sendRequestsToBrokers(epoch)
-      } catch {
-        case e: IllegalStateException =>
-          handleIllegalState(e)
-      }
-      // If the broker is a follower, updates the isr in ZK and notifies the current leader
-      replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
-        PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
-      def replicatedPartitionsBrokerLeads() = {
-        trace(s"All leaders = ${controllerContext.partitionLeadershipInfo.mkString(",")}")
-        controllerContext.partitionLeadershipInfo.filter {
-          case (topicPartition, leaderIsrAndControllerEpoch) =>
-            !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
-              leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
-              controllerContext.partitionReplicaAssignment(topicPartition).size > 1
-        }.keys
+    val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
+      controllerContext.partitionReplicaAssignment(partition).size > 1 &&
+        controllerContext.partitionLeadershipInfo.contains(partition) &&
+        !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)
+    }
+    val (partitionsLedByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
+      controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
+    }
+    partitionStateMachine.handleStateChanges(partitionsLedByBroker.toSeq, OnlinePartition, Some(ControlledShutdownPartitionLeaderElectionStrategy))
+    try {
+      brokerRequestBatch.newBatch()
+      partitionsFollowedByBroker.foreach { partition =>
+        brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition, deletePartition = false)
       }
-      replicatedPartitionsBrokerLeads().toSet
+      brokerRequestBatch.sendRequestsToBrokers(epoch)
+    } catch {
+      case e: IllegalStateException =>
+        handleIllegalState(e)
+    }
+    // If the broker is a follower, updates the isr in ZK and notifies the current leader
+    replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition =>
+      PartitionAndReplica(partition, id)).toSeq, OfflineReplica)
+    def replicatedPartitionsBrokerLeads() = {
+      trace(s"All leaders = ${controllerContext.partitionLeadershipInfo.mkString(",")}")
+      controllerContext.partitionLeadershipInfo.filter {
+        case (topicPartition, leaderIsrAndControllerEpoch) =>
+          !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
+            controllerContext.partitionReplicaAssignment(topicPartition).size > 1
+      }.keys
     }
+    replicatedPartitionsBrokerLeads().toSet
   }
 
-  case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
+  private def processLeaderAndIsrResponseReceived(leaderAndIsrResponseObj: AbstractResponse, brokerId: Int): Unit = {
+    if (!isActive) return
+    val leaderAndIsrResponse = leaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
 
-    def state = ControllerState.LeaderAndIsrResponseReceived
-
-    override def process(): Unit = {
-      if (!isActive) return
-      val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
-
-      if (leaderAndIsrResponse.error != Errors.NONE) {
-        stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
-        return
-      }
-
-      val offlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
-        case (tp, error) if error == Errors.KAFKA_STORAGE_ERROR => tp
-      }
-      val onlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
-        case (tp, error) if error == Errors.NONE => tp
-      }
-      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
-      val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
-      controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
-      val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
-
-      if (newOfflineReplicas.nonEmpty) {
-        stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
-        onReplicasBecomeOffline(newOfflineReplicas.map(PartitionAndReplica(_, brokerId)))
-      }
+    if (leaderAndIsrResponse.error != Errors.NONE) {
+      stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
+      return
     }
-  }
 
-  case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
-
-    def state = ControllerState.TopicDeletion
+    val offlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
+      case (tp, error) if error == Errors.KAFKA_STORAGE_ERROR => tp
+    }
+    val onlineReplicas = leaderAndIsrResponse.responses.asScala.collect {
+      case (tp, error) if error == Errors.NONE => tp
+    }
+    val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicPartition])
+    val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
+    controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
+    val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
 
-    override def process(): Unit = {
-      if (!isActive) return
-      val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
-      val responseMap = stopReplicaResponse.responses.asScala
-      debug(s"Delete topic callback invoked on StopReplica response received from broker $replicaId: $stopReplicaResponse")
-      val partitionsInError =
-        if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
-        else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
-      val replicasInError = partitionsInError.map(PartitionAndReplica(_, replicaId))
-      // move all the failed replicas to ReplicaDeletionIneligible
-      topicDeletionManager.failReplicaDeletion(replicasInError)
-      if (replicasInError.size != responseMap.size) {
-        // some replicas could have been successfully deleted
-        val deletedReplicas = responseMap.keySet -- partitionsInError
-        topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
-      }
+    if (newOfflineReplicas.nonEmpty) {
+      stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
+      onReplicasBecomeOffline(newOfflineReplicas.map(PartitionAndReplica(_, brokerId)))
     }
   }
 
-  case object Startup extends ControllerEvent {
+  private def processTopicDeletionStopReplicaResponseReceived(replicaId: Int,
+                                                              requestError: Errors,
+                                                              partitionErrors: Map[TopicPartition, Errors]): Unit = {
+    if (!isActive) return
+    debug(s"Delete topic callback invoked on StopReplica response received from broker $replicaId: " +
+      s"request error = $requestError, partition errors = $partitionErrors")
 
-    def state = ControllerState.ControllerChange
+    val partitionsInError = if (requestError != Errors.NONE)
+      partitionErrors.keySet
+    else
+      partitionErrors.filter { case (_, error) => error != Errors.NONE }.keySet
 
-    override def process(): Unit = {
-      zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
-      elect()
+    val replicasInError = partitionsInError.map(PartitionAndReplica(_, replicaId))
+    // move all the failed replicas to ReplicaDeletionIneligible
+    topicDeletionManager.failReplicaDeletion(replicasInError)
+    if (replicasInError.size != partitionErrors.size) {
+      // some replicas could have been successfully deleted
+      val deletedReplicas = partitionErrors.keySet -- partitionsInError
+      topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(PartitionAndReplica(_, replicaId)))
     }
+  }
 
+  private def processStartup(): Unit = {
+    zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+    elect()
   }
 
   private def updateMetrics(): Unit = {
@@ -1288,116 +1239,98 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
     }
   }
 
-  case object BrokerChange extends ControllerEvent {
-    override def state: ControllerState = ControllerState.BrokerChange
-
-    override def process(): Unit = {
-      if (!isActive) return
-      val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
-      val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
-      val curBrokerIds = curBrokerIdAndEpochs.keySet
-      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
-      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
-      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
-      val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
-        .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
-      val newBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => newBrokerIds.contains(broker.id))
-      val bouncedBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => bouncedBrokerIds.contains(broker.id))
-      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
-      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
-      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
-      val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
-      info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
-        s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, " +
-        s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
-        s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
-
-      newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
-      bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
-      bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
-      deadBrokerIds.foreach(controllerChannelManager.removeBroker)
-      if (newBrokerIds.nonEmpty) {
-        controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
-        onBrokerStartup(newBrokerIdsSorted)
-      }
-      if (bouncedBrokerIds.nonEmpty) {
-        controllerContext.removeLiveBrokers(bouncedBrokerIds)
-        onBrokerFailure(bouncedBrokerIdsSorted)
-        controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
-        onBrokerStartup(bouncedBrokerIdsSorted)
-      }
-      if (deadBrokerIds.nonEmpty) {
-        controllerContext.removeLiveBrokers(deadBrokerIds)
-        onBrokerFailure(deadBrokerIdsSorted)
-      }
-
-      if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty || bouncedBrokerIds.nonEmpty) {
-        info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
-      }
-    }
-  }
-
-  case class BrokerModifications(brokerId: Int) extends ControllerEvent {
-    override def state: ControllerState = ControllerState.BrokerChange
-
-    override def process(): Unit = {
-      if (!isActive) return
-      val newMetadataOpt = zkClient.getBroker(brokerId)
-      val oldMetadataOpt = controllerContext.liveOrShuttingDownBroker(brokerId)
-      if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
-        val oldMetadata = oldMetadataOpt.get
-        val newMetadata = newMetadataOpt.get
-        if (newMetadata.endPoints != oldMetadata.endPoints) {
-          info(s"Updated broker metadata: $oldMetadata -> $newMetadata")
-          controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
-          onBrokerUpdate(brokerId)
-        }
+  private def processBrokerChange(): Unit = {
+    if (!isActive) return
+    val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
+    val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
+    val curBrokerIds = curBrokerIdAndEpochs.keySet
+    val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+    val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+    val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+    val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
+      .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
+    val newBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => newBrokerIds.contains(broker.id))
+    val bouncedBrokerAndEpochs = curBrokerAndEpochs.filterKeys(broker => bouncedBrokerIds.contains(broker.id))
+    val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+    val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+    val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
+    val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
+    info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
+      s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, " +
+      s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
+      s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
+
+    newBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
+    bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
+    bouncedBrokerAndEpochs.keySet.foreach(controllerChannelManager.addBroker)
+    deadBrokerIds.foreach(controllerChannelManager.removeBroker)
+    if (newBrokerIds.nonEmpty) {
+      controllerContext.addLiveBrokersAndEpochs(newBrokerAndEpochs)
+      onBrokerStartup(newBrokerIdsSorted)
+    }
+    if (bouncedBrokerIds.nonEmpty) {
+      controllerContext.removeLiveBrokers(bouncedBrokerIds)
+      onBrokerFailure(bouncedBrokerIdsSorted)
+      controllerContext.addLiveBrokersAndEpochs(bouncedBrokerAndEpochs)
+      onBrokerStartup(bouncedBrokerIdsSorted)
+    }
+    if (deadBrokerIds.nonEmpty) {
+      controllerContext.removeLiveBrokers(deadBrokerIds)
+      onBrokerFailure(deadBrokerIdsSorted)
+    }
+
+    if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty || bouncedBrokerIds.nonEmpty) {
+      info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
+    }
+  }
+
+  private def processBrokerModification(brokerId: Int): Unit = {
+    if (!isActive) return
+    val newMetadataOpt = zkClient.getBroker(brokerId)
+    val oldMetadataOpt = controllerContext.liveOrShuttingDownBroker(brokerId)
+    if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
+      val oldMetadata = oldMetadataOpt.get
+      val newMetadata = newMetadataOpt.get
+      if (newMetadata.endPoints != oldMetadata.endPoints) {
+        info(s"Updated broker metadata: $oldMetadata -> $newMetadata")
+        controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
+        onBrokerUpdate(brokerId)
       }
     }
   }
 
-  case object TopicChange extends ControllerEvent {
-    override def state: ControllerState = ControllerState.TopicChange
-
-    override def process(): Unit = {
-      if (!isActive) return
-      val topics = zkClient.getAllTopicsInCluster.toSet
-      val newTopics = topics -- controllerContext.allTopics
-      val deletedTopics = controllerContext.allTopics -- topics
-      controllerContext.allTopics = topics
+  private def processTopicChange(): Unit = {
+    if (!isActive) return
+    val topics = zkClient.getAllTopicsInCluster.toSet
+    val newTopics = topics -- controllerContext.allTopics
+    val deletedTopics = controllerContext.allTopics -- topics
+    controllerContext.allTopics = topics
 
-      registerPartitionModificationsHandlers(newTopics.toSeq)
-      val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
-      deletedTopics.foreach(controllerContext.removeTopic)
-      addedPartitionReplicaAssignment.foreach {
-        case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas)
-      }
-      info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
-        s"[$addedPartitionReplicaAssignment]")
-      if (addedPartitionReplicaAssignment.nonEmpty)
-        onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
+    registerPartitionModificationsHandlers(newTopics.toSeq)
+    val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
+    deletedTopics.foreach(controllerContext.removeTopic)
+    addedPartitionReplicaAssignment.foreach {
+      case (topicAndPartition, newReplicas) => controllerContext.updatePartitionReplicaAssignment(topicAndPartition, newReplicas)
     }
+    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
+      s"[$addedPartitionReplicaAssignment]")
+    if (addedPartitionReplicaAssignment.nonEmpty)
+      onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
   }
 
-  case object LogDirEventNotification extends ControllerEvent {
-    override def state: ControllerState = ControllerState.LogDirChange
-
-    override def process(): Unit = {
-      if (!isActive) return
-      val sequenceNumbers = zkClient.getAllLogDirEventNotifications
-      try {
-        val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
-        onBrokerLogDirFailure(brokerIds)
-      } finally {
-        // delete processed children
-        zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
-      }
+  private def processLogDirEventNotification(): Unit = {
+    if (!isActive) return
+    val sequenceNumbers = zkClient.getAllLogDirEventNotifications
+    try {
+      val brokerIds = zkClient.getBrokerIdsFromLogDirEvents(sequenceNumbers)
+      onBrokerLogDirFailure(brokerIds)
+    } finally {
+      // delete processed children
+      zkClient.deleteLogDirEventNotifications(sequenceNumbers, controllerContext.epochZkVersion)
     }
   }
 
-  case class PartitionModifications(topic: String) extends ControllerEvent {
-    override def state: ControllerState = ControllerState.TopicChange
-
+  private def processPartitionModifications(topic: String): Unit = {
     def restorePartitionReplicaAssignment(topic: String, newPartitionReplicaAssignment : immutable.Map[TopicPartition, Seq[Int]]): Unit = {
       info("Restoring the partition replica assignment for topic %s".format(topic))
 
@@ -1408,349 +1341,373 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
       zkClient.setTopicAssignment(topic, existingPartitionReplicaAssignment, controllerContext.epochZkVersion)
     }
 
-    override def process(): Unit = {
-      if (!isActive) return
-      val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
-      val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
-        controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
-      }
-      if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-        if (partitionsToBeAdded.nonEmpty) {
-          warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
-            .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+    if (!isActive) return
+    val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
+    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
+      controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
+    }
+    if (topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+      if (partitionsToBeAdded.nonEmpty) {
+        warn("Skipping adding partitions %s for topic %s since it is currently being deleted"
+          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
 
-          restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
-        } else {
-          // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
-          info("Ignoring partition change during topic deletion as no new partitions are added")
-        }
-      else {
-        if (partitionsToBeAdded.nonEmpty) {
-          info(s"New partitions to be added $partitionsToBeAdded")
-          partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
-            controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
-          }
-          onNewPartitionCreation(partitionsToBeAdded.keySet)
+        restorePartitionReplicaAssignment(topic, partitionReplicaAssignment)
+      } else {
+        // This can happen if existing partition replica assignment are restored to prevent increasing partition count during topic deletion
+        info("Ignoring partition change during topic deletion as no new partitions are added")
+      }
+    else {
+      if (partitionsToBeAdded.nonEmpty) {
+        info(s"New partitions to be added $partitionsToBeAdded")
+        partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>
+          controllerContext.updatePartitionReplicaAssignment(topicPartition, assignedReplicas)
         }
+        onNewPartitionCreation(partitionsToBeAdded.keySet)
       }
     }
   }
 
-  case object TopicDeletion extends ControllerEvent {
-    override def state: ControllerState = ControllerState.TopicDeletion
-
-    override def process(): Unit = {
-      if (!isActive) return
-      var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
-      debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
-      val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
-      if (nonExistentTopics.nonEmpty) {
-        warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
-        zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
-      }
-      topicsToBeDeleted --= nonExistentTopics
-      if (config.deleteTopicEnable) {
-        if (topicsToBeDeleted.nonEmpty) {
-          info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
-          // mark topic ineligible for deletion if other state changes are in progress
-          topicsToBeDeleted.foreach { topic =>
-            val partitionReassignmentInProgress =
-              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
-            if (partitionReassignmentInProgress)
-              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
-          }
-          // add topic to deletion list
-          topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
+  private def processTopicDeletion(): Unit = {
+    if (!isActive) return
+    var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
+    debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
+    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
+    if (nonExistentTopics.nonEmpty) {
+      warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
+      zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
+    }
+    topicsToBeDeleted --= nonExistentTopics
+    if (config.deleteTopicEnable) {
+      if (topicsToBeDeleted.nonEmpty) {
+        info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
+        // mark topic ineligible for deletion if other state changes are in progress
+        topicsToBeDeleted.foreach { topic =>
+          val partitionReassignmentInProgress =
+            controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+          if (partitionReassignmentInProgress)
+            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
         }
-      } else {
-        // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
-        info(s"Removing $topicsToBeDeleted since delete topic is disabled")
-        zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
+        // add topic to deletion list
+        topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
       }
+    } else {
+      // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
+      info(s"Removing $topicsToBeDeleted since delete topic is disabled")
+      zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
     }
   }
 
-  case object PartitionReassignment extends ControllerEvent {
-    override def state: ControllerState = ControllerState.PartitionReassignment
+  private def processPartitionReassignment(): Unit = {
+    if (!isActive) return
 
-    override def process(): Unit = {
-      if (!isActive) return
+    // We need to register the watcher if the path doesn't exist in order to detect future reassignments and we get
+    // the `path exists` check for free
+    if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
+      val partitionReassignment = zkClient.getPartitionReassignment
 
-      // We need to register the watcher if the path doesn't exist in order to detect future reassignments and we get
-      // the `path exists` check for free
-      if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) {
-        val partitionReassignment = zkClient.getPartitionReassignment
-
-        // Populate `partitionsBeingReassigned` with all partitions being reassigned before invoking
-        // `maybeTriggerPartitionReassignment` (see method documentation for the reason)
-        partitionReassignment.foreach { case (tp, newReplicas) =>
-          val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager,
-            tp)
-          controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
-        }
-
-        maybeTriggerPartitionReassignment(partitionReassignment.keySet)
+      // Populate `partitionsBeingReassigned` with all partitions being reassigned before invoking
+      // `maybeTriggerPartitionReassignment` (see method documentation for the reason)
+      partitionReassignment.foreach { case (tp, newReplicas) =>
+        val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, tp)
+        controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler))
       }
-    }
-  }
 
-  case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
-    override def state: ControllerState = ControllerState.PartitionReassignment
-
-    override def process(): Unit = {
-      if (!isActive) return
-      // check if this partition is still being reassigned or not
-      controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
-        val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
-        zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
-          case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
-            val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
-            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-            if (caughtUpReplicas == reassignedReplicas) {
-              // resume the partition reassignment process
-              info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
-                s"partition $partition being reassigned. Resuming partition reassignment")
-              onPartitionReassignment(partition, reassignedPartitionContext)
-            }
-            else {
-              info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
-                s"partition $partition being reassigned. Replica(s) " +
-                s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
-            }
-          case None => error(s"Error handling reassignment of partition $partition to replicas " +
-                         s"${reassignedReplicas.mkString(",")} as it was never created")
-        }
-      }
+      maybeTriggerPartitionReassignment(partitionReassignment.keySet)
     }
   }
 
-  case object IsrChangeNotification extends ControllerEvent {
-    override def state: ControllerState = ControllerState.IsrChange
-
-    override def process(): Unit = {
-      if (!isActive) return
-      val sequenceNumbers = zkClient.getAllIsrChangeNotifications
-      try {
-        val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
-        if (partitions.nonEmpty) {
-          updateLeaderAndIsrCache(partitions)
-          processUpdateNotifications(partitions)
-        }
-      } finally {
-        // delete the notifications
-        zkClient.deleteIsrChangeNotifications(sequenceNumbers, controllerContext.epochZkVersion)
+  private def processPartitionReassignmentIsrChange(partition: TopicPartition): Unit = {
+    if (!isActive) return
+    // check if this partition is still being reassigned or not
+    controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
+      val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
+      zkClient.getTopicPartitionStates(Seq(partition)).get(partition) match {
+        case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
+          val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+          val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+          if (caughtUpReplicas == reassignedReplicas) {
+            // resume the partition reassignment process
+            info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
+              s"partition $partition being reassigned. Resuming partition reassignment")
+            onPartitionReassignment(partition, reassignedPartitionContext)
+          }
+          else {
+            info(s"${caughtUpReplicas.size}/${reassignedReplicas.size} replicas have caught up with the leader for " +
+              s"partition $partition being reassigned. Replica(s) " +
+              s"${(reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")} still need to catch up")
+          }
+        case None => error(s"Error handling reassignment of partition $partition to replicas " +
+          s"${reassignedReplicas.mkString(",")} as it was never created")
       }
     }
+  }
 
-    private def processUpdateNotifications(partitions: Seq[TopicPartition]) {
+  private def processIsrChangeNotification(): Unit = {
+    def processUpdateNotifications(partitions: Seq[TopicPartition]) {
       val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
       debug(s"Sending MetadataRequest to Brokers: $liveBrokers for TopicPartitions: $partitions")
       sendUpdateMetadataRequest(liveBrokers, partitions.toSet)
     }
+
+    if (!isActive) return
+    val sequenceNumbers = zkClient.getAllIsrChangeNotifications
+    try {
+      val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)
+      if (partitions.nonEmpty) {
+        updateLeaderAndIsrCache(partitions)
+        processUpdateNotifications(partitions)
+      }
+    } finally {
+      // delete the notifications
+      zkClient.deleteIsrChangeNotifications(sequenceNumbers, controllerContext.epochZkVersion)
+    }
   }
 
-  type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError])=>Unit
 
-  def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit =
+  def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit = {
     eventManager.put(PreferredReplicaLeaderElection(Some(partitions), AdminClientTriggered, callback))
+  }
 
-  case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
-                                            electionType: ElectionType = ZkTriggered,
-                                            callback: ElectPreferredLeadersCallback = (_,_) =>{}) extends PreemptableControllerEvent {
-    override def state: ControllerState = ControllerState.ManualLeaderBalance
+  private def preemptPreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]], callback: ElectPreferredLeadersCallback = (_, _) =>{}): Unit = {
+    callback(Map.empty, partitionsFromAdminClientOpt match {
+      case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
+      case None => Map.empty
+    })
+  }
 
-    override def handlePreempt(): Unit = {
+  private def processPreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
+                                                    electionType: ElectionType = ZkTriggered,
+                                                    callback: ElectPreferredLeadersCallback = (_,_) =>{}): Unit = {
+    if (!isActive) {
       callback(Map.empty, partitionsFromAdminClientOpt match {
         case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
         case None => Map.empty
       })
-    }
-
-    override def handleProcess(): Unit = {
-      if (!isActive) {
-        callback(Map.empty, partitionsFromAdminClientOpt match {
-          case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap
-          case None => Map.empty
-        })
-      } else {
-        // We need to register the watcher if the path doesn't exist in order to detect future preferred replica
-        // leader elections and we get the `path exists` check for free
-        if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
-          val partitions = partitionsFromAdminClientOpt match {
-            case Some(partitions) => partitions
-            case None => zkClient.getPreferredReplicaElection
-          }
-
-          val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
-          invalidPartitions.foreach { p =>
-            info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.")
-          }
+    } else {
+      // We need to register the watcher if the path doesn't exist in order to detect future preferred replica
+      // leader elections and we get the `path exists` check for free
+      if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
+        val partitions = partitionsFromAdminClientOpt match {
+          case Some(partitions) => partitions
+          case None => zkClient.getPreferredReplicaElection
+        }
 
-          val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition =>
-            topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
-          if (partitionsBeingDeleted.nonEmpty) {
-            warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " +
-              s"since the respective topics are being deleted")
-          }
-          // partition those where preferred is already leader
-          val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition =>
-            val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
-            val preferredReplica = assignedReplicas.head
-            val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
-            currentLeader != preferredReplica
-          }
+        val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
+        invalidPartitions.foreach { p =>
+          info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.")
+        }
 
-          val electionErrors = onPreferredReplicaElection(electablePartitions, electionType)
-          val successfulPartitions = electablePartitions -- electionErrors.keySet
-          val results = electionErrors.map { case (partition, ex) =>
-            val apiError = if (ex.isInstanceOf[StateChangeFailedException])
-              new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage)
-            else
-              ApiError.fromThrowable(ex)
-            partition -> apiError
-          } ++
-            alreadyPreferred.map(_ -> ApiError.NONE) ++
-            partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++
-            invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.")
-            )
-          debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results")
-          callback(successfulPartitions.map(
-              tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap,
-            results)
+        val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition =>
+          topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
+        if (partitionsBeingDeleted.nonEmpty) {
+          warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " +
+            s"since the respective topics are being deleted")
+        }
+        // partition those where preferred is already leader
+        val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition =>
+          val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
+          val preferredReplica = assignedReplicas.head
+          val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
+          currentLeader != preferredReplica
         }
+
+        val electionErrors = onPreferredReplicaElection(electablePartitions, electionType)
+        val successfulPartitions = electablePartitions -- electionErrors.keySet
+        val results = electionErrors.map { case (partition, ex) =>
+          val apiError = if (ex.isInstanceOf[StateChangeFailedException])
+            new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage)
+          else
+            ApiError.fromThrowable(ex)
+          partition -> apiError
+        } ++
+          alreadyPreferred.map(_ -> ApiError.NONE) ++
+          partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++
+          invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.")
+          )
+        debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results")
+        callback(successfulPartitions.map(
+          tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap,
+          results)
       }
     }
   }
 
-  case object ControllerChange extends ControllerEvent {
-    override def state = ControllerState.ControllerChange
-
-    override def process(): Unit = {
-      maybeResign()
-    }
+  private def processControllerChange(): Unit = {
+    maybeResign()
   }
 
-  case object Reelect extends ControllerEvent {
-    override def state = ControllerState.ControllerChange
-
-    override def process(): Unit = {
-      maybeResign()
-      elect()
-    }
+  private def processReelect(): Unit = {
+    maybeResign()
+    elect()
   }
 
-  case object RegisterBrokerAndReelect extends ControllerEvent {
-    override def state: ControllerState = ControllerState.ControllerChange
+  private def processRegisterBrokerAndReelect(): Unit = {
+    _brokerEpoch = zkClient.registerBroker(brokerInfo)
+    processReelect()
+  }
 
-    override def process(): Unit = {
-      _brokerEpoch = zkClient.registerBroker(brokerInfo)
-      Reelect.process()
-    }
+  private def processExpire(): Unit = {
+    activeControllerId = -1
+    onControllerResignation()
   }
 
-  // We can't make this a case object due to the countDownLatch field
-  class Expire extends ControllerEvent {
-    private val processingStarted = new CountDownLatch(1)
-    override def state = ControllerState.ControllerChange
 
-    override def process(): Unit = {
-      processingStarted.countDown()
-      activeControllerId = -1
-      onControllerResignation()
+  override def process(event: ControllerEvent): Unit = {
+    try {
+      event match {
+        // Used only in test cases
+        case event: MockEvent =>
+          event.process()
+        case AutoPreferredReplicaLeaderElection =>
+          processAutoPreferredReplicaLeaderElection()
+        case PreferredReplicaLeaderElection(partitions, electionType, callback) =>
+          processPreferredReplicaLeaderElection(partitions, electionType, callback)
+        case UncleanLeaderElectionEnable =>
+          processUncleanLeaderElectionEnable()
+        case TopicUncleanLeaderElectionEnable(topic) =>
+          processTopicUncleanLeaderElectionEnable(topic)
+        case ControlledShutdown(id, brokerEpoch, callback) =>
+          processControlledShutdown(id, brokerEpoch, callback)
+        case LeaderAndIsrResponseReceived(response, brokerId) =>
+          processLeaderAndIsrResponseReceived(response, brokerId)
+        case TopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors) =>
+          processTopicDeletionStopReplicaResponseReceived(replicaId, requestError, partitionErrors)
+        case BrokerChange =>
+          processBrokerChange()
+        case BrokerModifications(brokerId) =>
+          processBrokerModification(brokerId)
+        case ControllerChange =>
+          processControllerChange()
+        case Reelect =>
+          processReelect()
+        case RegisterBrokerAndReelect =>
+          processRegisterBrokerAndReelect()
+        case Expire =>
+          processExpire()
+        case TopicChange =>
+          processTopicChange()
+        case LogDirEventNotification =>
+          processLogDirEventNotification()
+        case PartitionModifications(topic) =>
+          processPartitionModifications(topic)
+        case TopicDeletion =>
+          processTopicDeletion()
+        case PartitionReassignment =>
+          processPartitionReassignment()
+        case PartitionReassignmentIsrChange(partition) =>
+          processPartitionReassignmentIsrChange(partition)
+        case IsrChangeNotification =>
+          processIsrChangeNotification()
+        case Startup =>
+          processStartup()
+      }
+    } catch {
+      case e: ControllerMovedException =>
+        info(s"Controller moved to another broker when processing $event.", e)
+        maybeResign()
+      case e: Throwable =>
+        error(s"Error processing event $event", e)
+    } finally {
+      updateMetrics()
     }
+  }
 
-    def waitUntilProcessingStarted(): Unit = {
-      processingStarted.await()
+  override def preempt(event: ControllerEvent): Unit = {
+    event match {
+      case PreferredReplicaLeaderElection(partitions, _, callback) =>
+        preemptPreferredReplicaLeaderElection(partitions, callback)
+      case ControlledShutdown(id, brokerEpoch, callback) =>
+        preemptControlledShutdown(id, brokerEpoch, callback)
+      case _ =>
     }
   }
-
 }
 
-class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
+class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
   override val path: String = BrokerIdsZNode.path
 
   override def handleChildChange(): Unit = {
-    eventManager.put(controller.BrokerChange)
+    eventManager.put(BrokerChange)
   }
 }
 
-class BrokerModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
+class BrokerModificationsHandler(eventManager: ControllerEventManager, brokerId: Int) extends ZNodeChangeHandler {
   override val path: String = BrokerIdZNode.path(brokerId)
 
   override def handleDataChange(): Unit = {
-    eventManager.put(controller.BrokerModifications(brokerId))
+    eventManager.put(BrokerModifications(brokerId))
   }
 }
 
-class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
+class TopicChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
   override val path: String = TopicsZNode.path
 
-  override def handleChildChange(): Unit = eventManager.put(controller.TopicChange)
+  override def handleChildChange(): Unit = eventManager.put(TopicChange)
 }
 
-class LogDirEventNotificationHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
+class LogDirEventNotificationHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
   override val path: String = LogDirEventNotificationZNode.path
 
-  override def handleChildChange(): Unit = eventManager.put(controller.LogDirEventNotification)
+  override def handleChildChange(): Unit = eventManager.put(LogDirEventNotification)
 }
 
 object LogDirEventNotificationHandler {
   val Version: Long = 1L
 }
 
-class PartitionModificationsHandler(controller: KafkaController, eventManager: ControllerEventManager, topic: String) extends ZNodeChangeHandler {
+class PartitionModificationsHandler(eventManager: ControllerEventManager, topic: String) extends ZNodeChangeHandler {
   override val path: String = TopicZNode.path(topic)
 
-  override def handleDataChange(): Unit = eventManager.put(controller.PartitionModifications(topic))
+  override def handleDataChange(): Unit = eventManager.put(PartitionModifications(topic))
 }
 
-class TopicDeletionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
+class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
   override val path: String = DeleteTopicsZNode.path
 
-  override def handleChildChange(): Unit = eventManager.put(controller.TopicDeletion)
+  override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
 }
 
-class PartitionReassignmentHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
+class PartitionReassignmentHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
   override val path: String = ReassignPartitionsZNode.path
 
   // Note that the event is also enqueued when the znode is deleted, but we do it explicitly instead of relying on
   // handleDeletion(). This approach is more robust as it doesn't depend on the watcher being re-registered after
   // it's consumed during data changes (we ensure re-registration when the znode is deleted).
-  override def handleCreation(): Unit = eventManager.put(controller.PartitionReassignment)
+  override def handleCreation(): Unit = eventManager.put(PartitionReassignment)
 }
 
-class PartitionReassignmentIsrChangeHandler(controller: KafkaController, eventManager: ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
+class PartitionReassignmentIsrChangeHandler(eventManager: ControllerEventManager, partition: TopicPartition) extends ZNodeChangeHandler {
   override val path: String = TopicPartitionStateZNode.path(partition)
 
-  override def handleDataChange(): Unit = eventManager.put(controller.PartitionReassignmentIsrChange(partition))
+  override def handleDataChange(): Unit = eventManager.put(PartitionReassignmentIsrChange(partition))
 }
 
-class IsrChangeNotificationHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
+class IsrChangeNotificationHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
   override val path: String = IsrChangeNotificationZNode.path
 
-  override def handleChildChange(): Unit = eventManager.put(controller.IsrChangeNotification)
+  override def handleChildChange(): Unit = eventManager.put(IsrChangeNotification)
 }
 
 object IsrChangeNotificationHandler {
   val Version: Long = 1L
 }
 
-class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
+class PreferredReplicaElectionHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
   override val path: String = PreferredReplicaElectionZNode.path
 
-  override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection(None))
+  override def handleCreation(): Unit = eventManager.put(PreferredReplicaLeaderElection(None))
 }
 
-class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
+class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
   override val path: String = ControllerZNode.path
 
-  override def handleCreation(): Unit = eventManager.put(controller.ControllerChange)
-  override def handleDeletion(): Unit = eventManager.put(controller.Reelect)
-  override def handleDataChange(): Unit = eventManager.put(controller.ControllerChange)
+  override def handleCreation(): Unit = eventManager.put(ControllerChange)
+  override def handleDeletion(): Unit = eventManager.put(Reelect)
+  override def handleDataChange(): Unit = eventManager.put(ControllerChange)
 }
 
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
-                                       val reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler) {
+                                       reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler) {
 
   def registerReassignIsrChangeHandler(zkClient: KafkaZkClient): Unit =
     zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
@@ -1792,30 +1749,102 @@ private[controller] class ControllerStats extends KafkaMetricsGroup {
 }
 
 sealed trait ControllerEvent {
-  val enqueueTimeMs: Long = Time.SYSTEM.milliseconds()
-
   def state: ControllerState
-  def process(): Unit
 }
 
-/**
-  * A `ControllerEvent`, such as one with a client callback, which needs specific handling in the event of ZK session expiration.
-  */
-sealed trait PreemptableControllerEvent extends ControllerEvent {
+case object ControllerChange extends ControllerEvent {
+  override def state = ControllerState.ControllerChange
+}
 
-  val spent = new AtomicBoolean(false)
+case object Reelect extends ControllerEvent {
+  override def state = ControllerState.ControllerChange
+}
 
-  final def preempt(): Unit = {
-    if (!spent.getAndSet(true))
-      handlePreempt()
-  }
+case object RegisterBrokerAndReelect extends ControllerEvent {
+  override def state: ControllerState = ControllerState.ControllerChange
+}
 
-  final def process(): Unit = {
-    if (!spent.getAndSet(true))
-      handleProcess()
-  }
+case object Expire extends ControllerEvent {
+  override def state = ControllerState.ControllerChange
+}
+
+case object ShutdownEventThread extends ControllerEvent {
+  def state = ControllerState.ControllerShutdown
+}
+
+case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
+  def state = ControllerState.AutoLeaderBalance
+}
+
+case object UncleanLeaderElectionEnable extends ControllerEvent {
+  def state = ControllerState.UncleanLeaderElectionEnable
+}
+
+case class TopicUncleanLeaderElectionEnable(topic: String) extends ControllerEvent {
+  def state = ControllerState.TopicUncleanLeaderElectionEnable
+}
+
+case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent {
+  def state = ControllerState.ControlledShutdown
+}
+
+case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
+  def state = ControllerState.LeaderAndIsrResponseReceived
+}
+
+case class TopicDeletionStopReplicaResponseReceived(replicaId: Int,
+                                                    requestError: Errors,
+                                                    partitionErrors: Map[TopicPartition, Errors]) extends ControllerEvent {
+  def state = ControllerState.TopicDeletion
+}
 
-  def handlePreempt(): Unit
+case object Startup extends ControllerEvent {
+  def state = ControllerState.ControllerChange
+}
 
-  def handleProcess(): Unit
+case object BrokerChange extends ControllerEvent {
+  override def state: ControllerState = ControllerState.BrokerChange
+}
+
+case class BrokerModifications(brokerId: Int) extends ControllerEvent {
+  override def state: ControllerState = ControllerState.BrokerChange
+}
+
+case object TopicChange extends ControllerEvent {
+  override def state: ControllerState = ControllerState.TopicChange
+}
+
+case object LogDirEventNotification extends ControllerEvent {
+  override def state: ControllerState = ControllerState.LogDirChange
+}
+
+case class PartitionModifications(topic: String) extends ControllerEvent {
+  override def state: ControllerState = ControllerState.TopicChange
+}
+
+case object TopicDeletion extends ControllerEvent {
+  override def state: ControllerState = ControllerState.TopicDeletion
+}
+
+case object PartitionReassignment extends ControllerEvent {
+  override def state: ControllerState = ControllerState.PartitionReassignment
+}
+
+case class PartitionReassignmentIsrChange(partition: TopicPartition) extends ControllerEvent {
+  override def state: ControllerState = ControllerState.PartitionReassignment
+}
+
+case object IsrChangeNotification extends ControllerEvent {
+  override def state: ControllerState = ControllerState.IsrChange
+}
+
+case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
+                                          electionType: ElectionType = ZkTriggered,
+                                          callback: ElectPreferredLeadersCallback = (_,_) => {}) extends ControllerEvent {
+  override def state: ControllerState = ControllerState.ManualLeaderBalance
+}
+
+// Used only in test cases
+abstract class MockEvent(val state: ControllerState) extends ControllerEvent {
+  def process(): Unit
 }
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 0f56e3a..25d9faf 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -92,8 +92,11 @@ class TopicDeletionManager(config: KafkaConfig,
   val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
 
   def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]): Unit = {
+    info(s"Initializing manager with initial deletions: $initialTopicsToBeDeleted, " +
+      s"initial ineligible deletions: $initialTopicsIneligibleForDeletion")
+
     if (isDeleteTopicEnabled) {
-      controllerContext.topicsToBeDeleted ++= initialTopicsToBeDeleted
+      controllerContext.queueTopicDeletion(initialTopicsToBeDeleted)
       controllerContext.topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & controllerContext.topicsToBeDeleted
     } else {
       // if delete topic is disabled clean the topic entries under /admin/delete_topics
@@ -116,7 +119,7 @@ class TopicDeletionManager(config: KafkaConfig,
    */
   def enqueueTopicsForDeletion(topics: Set[String]) {
     if (isDeleteTopicEnabled) {
-      controllerContext.topicsToBeDeleted ++= topics
+      controllerContext.queueTopicDeletion(topics)
       resumeDeletions()
     }
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 71c5f86..cccccfe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -388,7 +388,7 @@ class ReplicaManager(val config: KafkaConfig,
           s"${stopReplicaRequest.controllerEpoch}. Latest known controller epoch is $controllerEpoch")
         (responseMap, Errors.STALE_CONTROLLER_EPOCH)
       } else {
-        val partitions = stopReplicaRequest.partitions.asScala
+        val partitions = stopReplicaRequest.partitions.asScala.toSet
         controllerEpoch = stopReplicaRequest.controllerEpoch
         // First stop fetchers for all partitions, then stop the corresponding replicas
         replicaFetcherManager.removeFetcherForPartitions(partitions)
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
new file mode 100644
index 0000000..9f09231
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import java.util.Properties
+
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, KAFKA_0_10_2_IV0, KAFKA_0_9_0, KAFKA_1_0_IV0, KAFKA_2_2_IV0, LeaderAndIsr}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, StopReplicaRequest, StopReplicaResponse, UpdateMetadataRequest}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.Assertions
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+class ControllerChannelManagerTest {
+  private val controllerId = 1
+  private val controllerEpoch = 1
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, "zkConnect"))
+  private val logger = new StateChangeLogger(controllerId, true, None)
+
+  type ControlRequest = AbstractControlRequest.Builder[_ <: AbstractControlRequest]
+
+  @Test
+  def testLeaderAndIsrRequestSent(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Map(
+      new TopicPartition("foo", 0) -> LeaderAndIsr(1, List(1, 2)),
+      new TopicPartition("foo", 1) -> LeaderAndIsr(2, List(2, 3)),
+      new TopicPartition("bar", 1) -> LeaderAndIsr(3, List(1, 3))
+    )
+
+    batch.newBatch()
+    partitions.foreach { case (partition, leaderAndIsr) =>
+      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+      context.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+      batch.addLeaderAndIsrRequestForBrokers(Seq(2), partition, leaderIsrAndControllerEpoch, Seq(1, 2, 3), isNew = false)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    val leaderAndIsrRequests = batch.collectLeaderAndIsrRequestsFor(2)
+    val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(2)
+    assertEquals(1, leaderAndIsrRequests.size)
+    assertEquals(1, updateMetadataRequests.size)
+
+    val leaderAndIsrRequest = leaderAndIsrRequests.head
+    assertEquals(controllerId, leaderAndIsrRequest.controllerId)
+    assertEquals(controllerEpoch, leaderAndIsrRequest.controllerEpoch)
+    assertEquals(partitions.keySet, leaderAndIsrRequest.partitionStates.keySet.asScala)
+    assertEquals(partitions.mapValues(_.leader),
+      leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader))
+    assertEquals(partitions.mapValues(_.isr),
+      leaderAndIsrRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala))
+
+    applyLeaderAndIsrResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
+    assertEquals(1, batch.sentEvents.size)
+
+    val LeaderAndIsrResponseReceived(response, brokerId) = batch.sentEvents.head
+    assertEquals(2, brokerId)
+    assertEquals(partitions.keySet, response.asInstanceOf[LeaderAndIsrResponse].responses.keySet.asScala)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestIsNew(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partition = new TopicPartition("foo", 0)
+    val leaderAndIsr = LeaderAndIsr(1, List(1, 2))
+
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    context.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    batch.newBatch()
+    batch.addLeaderAndIsrRequestForBrokers(Seq(2), partition, leaderIsrAndControllerEpoch, Seq(1, 2, 3), isNew = true)
+    batch.addLeaderAndIsrRequestForBrokers(Seq(2), partition, leaderIsrAndControllerEpoch, Seq(1, 2, 3), isNew = false)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    val leaderAndIsrRequests = batch.collectLeaderAndIsrRequestsFor(2)
+    val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(2)
+    assertEquals(1, leaderAndIsrRequests.size)
+    assertEquals(1, updateMetadataRequests.size)
+
+    val leaderAndIsrRequest = leaderAndIsrRequests.head
+    assertEquals(Set(partition), leaderAndIsrRequest.partitionStates.keySet.asScala)
+    assertTrue(leaderAndIsrRequest.partitionStates.get(partition).isNew)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestSentToLiveOrShuttingDownBrokers(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    // 2 is shutting down, 3 is dead
+    context.shuttingDownBrokerIds.add(2)
+    context.removeLiveBrokers(Set(3))
+
+    val partition = new TopicPartition("foo", 0)
+    val leaderAndIsr = LeaderAndIsr(1, List(1, 2))
+
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    context.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    batch.newBatch()
+    batch.addLeaderAndIsrRequestForBrokers(Seq(1, 2, 3), partition, leaderIsrAndControllerEpoch, Seq(1, 2, 3), isNew = false)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(2, batch.sentRequests.size)
+    assertEquals(Set(1, 2), batch.sentRequests.keySet)
+
+    for (brokerId <- Set(1, 2)) {
+      val leaderAndIsrRequests = batch.collectLeaderAndIsrRequestsFor(brokerId)
+      val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(brokerId)
+      assertEquals(1, leaderAndIsrRequests.size)
+      assertEquals(1, updateMetadataRequests.size)
+      val leaderAndIsrRequest = leaderAndIsrRequests.head
+      assertEquals(Set(partition), leaderAndIsrRequest.partitionStates.keySet.asScala)
+    }
+  }
+
+  @Test
+  def testLeaderAndIsrInterBrokerProtocolVersion(): Unit = {
+    testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, ApiKeys.LEADER_AND_ISR.latestVersion)
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val leaderAndIsrRequestVersion: Short =
+        if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 2
+        else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
+        else 0
+
+      testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(apiVersion, leaderAndIsrRequestVersion)
+    }
+  }
+
+  private def testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
+                                                                       expectedLeaderAndIsrVersion: Short): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val config = createConfig(interBrokerProtocolVersion)
+    val batch = new MockControllerBrokerRequestBatch(context, config)
+
+    val partition = new TopicPartition("foo", 0)
+    val leaderAndIsr = LeaderAndIsr(1, List(1, 2))
+
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    context.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    batch.newBatch()
+    batch.addLeaderAndIsrRequestForBrokers(Seq(2), partition, leaderIsrAndControllerEpoch, Seq(1, 2, 3), isNew = false)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    val leaderAndIsrRequests = batch.collectLeaderAndIsrRequestsFor(2, expectedLeaderAndIsrVersion)
+    assertEquals(1, leaderAndIsrRequests.size)
+    assertEquals(expectedLeaderAndIsrVersion, leaderAndIsrRequests.head.version)
+  }
+
+  @Test
+  def testUpdateMetadataRequestSent(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Map(
+      new TopicPartition("foo", 0) -> LeaderAndIsr(1, List(1, 2)),
+      new TopicPartition("foo", 1) -> LeaderAndIsr(2, List(2, 3)),
+      new TopicPartition("bar", 1) -> LeaderAndIsr(3, List(1, 3))
+    )
+
+    partitions.foreach { case (partition, leaderAndIsr) =>
+      context.partitionLeadershipInfo.put(partition, LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch))
+    }
+
+    batch.newBatch()
+    batch.addUpdateMetadataRequestForBrokers(Seq(2), partitions.keySet)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(2)
+    assertEquals(1, updateMetadataRequests.size)
+
+    val updateMetadataRequest = updateMetadataRequests.head
+    assertEquals(3, updateMetadataRequest.partitionStates.size)
+    assertEquals(partitions.mapValues(_.leader), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.leader))
+    assertEquals(partitions.mapValues(_.isr), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala))
+
+    assertEquals(controllerId, updateMetadataRequest.controllerId)
+    assertEquals(controllerEpoch, updateMetadataRequest.controllerEpoch)
+    assertEquals(3, updateMetadataRequest.liveBrokers.size)
+    assertEquals(Set(1, 2, 3), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet)
+  }
+
+  @Test
+  def testUpdateMetadataDoesNotIncludePartitionsWithoutLeaderAndIsr(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1),
+      new TopicPartition("bar", 1)
+    )
+
+    batch.newBatch()
+    batch.addUpdateMetadataRequestForBrokers(Seq(2), partitions)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(2)
+    assertEquals(1, updateMetadataRequests.size)
+
+    val updateMetadataRequest = updateMetadataRequests.head
+    assertEquals(0, updateMetadataRequest.partitionStates.size)
+    assertEquals(3, updateMetadataRequest.liveBrokers.size)
+    assertEquals(Set(1, 2, 3), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet)
+  }
+
+  @Test
+  def testUpdateMetadataRequestDuringTopicDeletion(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Map(
+      new TopicPartition("foo", 0) -> LeaderAndIsr(1, List(1, 2)),
+      new TopicPartition("foo", 1) -> LeaderAndIsr(2, List(2, 3)),
+      new TopicPartition("bar", 1) -> LeaderAndIsr(3, List(1, 3))
+    )
+
+    partitions.foreach { case (partition, leaderAndIsr) =>
+      context.partitionLeadershipInfo.put(partition, LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch))
+    }
+
+    context.queueTopicDeletion(Set("foo"))
+
+    batch.newBatch()
+    batch.addUpdateMetadataRequestForBrokers(Seq(2), partitions.keySet)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(2)
+    assertEquals(1, updateMetadataRequests.size)
+
+    val updateMetadataRequest = updateMetadataRequests.head
+    assertEquals(3, updateMetadataRequest.partitionStates.size)
+
+    assertTrue(updateMetadataRequest.partitionStates.asScala
+      .filterKeys(_.topic == "foo")
+      .values
+      .map(_.basePartitionState.leader)
+      .forall(leaderId => leaderId == LeaderAndIsr.LeaderDuringDelete))
+
+    assertEquals(partitions.filterKeys(_.topic == "bar").mapValues(_.leader),
+      updateMetadataRequest.partitionStates.asScala.filterKeys(_.topic == "bar").mapValues(_.basePartitionState.leader))
+    assertEquals(partitions.mapValues(_.isr), updateMetadataRequest.partitionStates.asScala.mapValues(_.basePartitionState.isr.asScala))
+
+    assertEquals(3, updateMetadataRequest.liveBrokers.size)
+    assertEquals(Set(1, 2, 3), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet)
+  }
+
+  @Test
+  def testUpdateMetadataIncludesLiveOrShuttingDownBrokers(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    // 2 is shutting down, 3 is dead
+    context.shuttingDownBrokerIds.add(2)
+    context.removeLiveBrokers(Set(3))
+
+    batch.newBatch()
+    batch.addUpdateMetadataRequestForBrokers(Seq(1, 2, 3), Set.empty)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(Set(1, 2), batch.sentRequests.keySet)
+
+    for (brokerId <- Set(1, 2)) {
+      val updateMetadataRequests = batch.collectUpdateMetadataRequestsFor(brokerId)
+      assertEquals(1, updateMetadataRequests.size)
+
+      val updateMetadataRequest = updateMetadataRequests.head
+      assertEquals(0, updateMetadataRequest.partitionStates.size)
+      assertEquals(2, updateMetadataRequest.liveBrokers.size)
+      assertEquals(Set(1, 2), updateMetadataRequest.liveBrokers.asScala.map(_.id).toSet)
+    }
+  }
+
+  @Test
+  def testUpdateMetadataInterBrokerProtocolVersion(): Unit = {
+    testUpdateMetadataFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, ApiKeys.UPDATE_METADATA.latestVersion)
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val updateMetadataRequestVersion: Short =
+        if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 5
+        else if (config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 4
+        else if (config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
+        else if (config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2
+        else if (config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
+        else 0
+
+      testUpdateMetadataFollowsInterBrokerProtocolVersion(apiVersion, updateMetadataRequestVersion)
+    }
+  }
+
+  private def testUpdateMetadataFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
+                                                          expectedUpdateMetadataVersion: Short): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val config = createConfig(interBrokerProtocolVersion)
+    val batch = new MockControllerBrokerRequestBatch(context, config)
+
+    batch.newBatch()
+    batch.addUpdateMetadataRequestForBrokers(Seq(2), Set.empty)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val requests = batch.collectUpdateMetadataRequestsFor(2, expectedUpdateMetadataVersion)
+    assertTrue(requests.forall(_.version == expectedUpdateMetadataVersion))
+  }
+
+  @Test
+  def testStopReplicaRequestSent(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1),
+      new TopicPartition("bar", 1)
+    )
+
+    batch.newBatch()
+    partitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition = false)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val sentRequests = batch.sentRequests(2)
+    assertEquals(1, sentRequests.size)
+
+    val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
+    assertEquals(1, sentStopReplicaRequests.size)
+
+    val stopReplicaRequest = sentStopReplicaRequests.head
+    assertFalse(stopReplicaRequest.deletePartitions())
+    assertEquals(partitions, stopReplicaRequest.partitions.asScala.toSet)
+
+    applyStopReplicaResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
+    assertEquals(0, batch.sentEvents.size)
+  }
+
+  @Test
+  def testStopReplicaRequestsWhileTopicQueuedForDeletion(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1),
+      new TopicPartition("bar", 1)
+    )
+
+    // Topic deletion is queued, but has not begun
+    context.queueTopicDeletion(Set("foo"))
+
+    batch.newBatch()
+    partitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition = true)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val sentRequests = batch.sentRequests(2)
+    assertEquals(1, sentRequests.size)
+
+    val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
+    assertEquals(1, sentStopReplicaRequests.size)
+    assertEquals(partitions, sentStopReplicaRequests.flatMap(_.partitions.asScala).toSet)
+    assertTrue(sentStopReplicaRequests.forall(_.deletePartitions()))
+
+    // No events will be sent after the response returns
+    applyStopReplicaResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
+    assertEquals(0, batch.sentEvents.size)
+  }
+
+  @Test
+  def testStopReplicaRequestsWhileTopicDeletionStarted(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1),
+      new TopicPartition("bar", 1)
+    )
+
+    context.queueTopicDeletion(Set("foo"))
+    context.beginTopicDeletion(Set("foo"))
+
+    batch.newBatch()
+    partitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition = true)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val sentRequests = batch.sentRequests(2)
+    assertEquals(1, sentRequests.size)
+
+    val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
+    assertEquals(1, sentStopReplicaRequests.size)
+    assertEquals(partitions, sentStopReplicaRequests.flatMap(_.partitions.asScala).toSet)
+    assertTrue(sentStopReplicaRequests.forall(_.deletePartitions()))
+
+    // When the topic is being deleted, we should provide a callback which sends
+    // the received event for the StopReplica response
+    applyStopReplicaResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
+    assertEquals(1, batch.sentEvents.size)
+
+    // We should only receive events for the topic being deleted
+    val includedPartitions = batch.sentEvents.flatMap {
+      case event: TopicDeletionStopReplicaResponseReceived => event.partitionErrors.keySet
+      case otherEvent => Assertions.fail(s"Unexpected sent event: $otherEvent")
+    }.toSet
+    assertEquals(partitions.filter(_.topic == "foo"), includedPartitions)
+  }
+
+  @Test
+  def testMixedDeleteAndNotDeleteStopReplicaRequests(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val deletePartitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1)
+    )
+
+    val nonDeletePartitions = Set(
+      new TopicPartition("bar", 0),
+      new TopicPartition("bar", 1)
+    )
+
+    batch.newBatch()
+    deletePartitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition = true)
+    }
+    nonDeletePartitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition = false)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val sentRequests = batch.sentRequests(2)
+    assertEquals(2, sentRequests.size)
+
+    val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
+    assertEquals(2, sentStopReplicaRequests.size)
+
+    val (deleteRequests, nonDeleteRequests) = sentStopReplicaRequests.partition(_.deletePartitions())
+    assertEquals(1, deleteRequests.size)
+    assertEquals(deletePartitions, deleteRequests.head.partitions.asScala.toSet)
+    assertEquals(1, nonDeleteRequests.size)
+    assertEquals(nonDeletePartitions, nonDeleteRequests.head.partitions.asScala.toSet)
+  }
+
+  @Test
+  def testStopReplicaGroupsByBroker(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    val partitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1),
+      new TopicPartition("bar", 1)
+    )
+
+    batch.newBatch()
+    partitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2, 3), partition, deletePartition = false)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(2, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+    assertTrue(batch.sentRequests.contains(3))
+
+    val sentRequests = batch.sentRequests(2)
+    assertEquals(1, sentRequests.size)
+
+    for (brokerId <- Set(2, 3)) {
+      val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(brokerId)
+      assertEquals(1, sentStopReplicaRequests.size)
+
+      val stopReplicaRequest = sentStopReplicaRequests.head
+      assertFalse(stopReplicaRequest.deletePartitions())
+      assertEquals(partitions, stopReplicaRequest.partitions.asScala.toSet)
+
+      applyStopReplicaResponseCallbacks(Errors.NONE, batch.sentRequests(2).toList)
+      assertEquals(0, batch.sentEvents.size)
+    }
+  }
+
+  @Test
+  def testStopReplicaSentOnlyToLiveAndShuttingDownBrokers(): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo", "bar"), 2, 3)
+    val batch = new MockControllerBrokerRequestBatch(context)
+
+    // 2 is shutting down, 3 is dead
+    context.shuttingDownBrokerIds.add(2)
+    context.removeLiveBrokers(Set(3))
+
+    val partitions = Set(
+      new TopicPartition("foo", 0),
+      new TopicPartition("foo", 1),
+      new TopicPartition("bar", 1)
+    )
+
+    batch.newBatch()
+    partitions.foreach { partition =>
+      batch.addStopReplicaRequestForBrokers(Seq(2, 3), partition, deletePartition = false)
+    }
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val sentRequests = batch.sentRequests(2)
+    assertEquals(1, sentRequests.size)
+
+    val sentStopReplicaRequests = batch.collectStopReplicRequestsFor(2)
+    assertEquals(1, sentStopReplicaRequests.size)
+
+    val stopReplicaRequest = sentStopReplicaRequests.head
+    assertFalse(stopReplicaRequest.deletePartitions())
+    assertEquals(partitions, stopReplicaRequest.partitions.asScala.toSet)
+  }
+
+  @Test
+  def testStopReplicaInterBrokerProtocolVersion(): Unit = {
+    testStopReplicaFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, ApiKeys.STOP_REPLICA.latestVersion)
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      if (apiVersion < KAFKA_2_2_IV0)
+        testStopReplicaFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, 0.toShort)
+      else
+        testStopReplicaFollowsInterBrokerProtocolVersion(ApiVersion.latestVersion, 1.toShort)
+    }
+  }
+
+  private def testStopReplicaFollowsInterBrokerProtocolVersion(interBrokerProtocolVersion: ApiVersion,
+                                                       expectedStopReplicaRequestVersion: Short): Unit = {
+    val context = initContext(Seq(1, 2, 3), Set("foo"), 2, 3)
+    val config = createConfig(interBrokerProtocolVersion)
+    val batch = new MockControllerBrokerRequestBatch(context, config)
+
+    val partition = new TopicPartition("foo", 0)
+
+    batch.newBatch()
+    batch.addStopReplicaRequestForBrokers(Seq(2), partition, deletePartition = false)
+    batch.sendRequestsToBrokers(controllerEpoch)
+
+    assertEquals(0, batch.sentEvents.size)
+    assertEquals(1, batch.sentRequests.size)
+    assertTrue(batch.sentRequests.contains(2))
+
+    val requests = batch.collectStopReplicRequestsFor(2, expectedStopReplicaRequestVersion)
+    assertTrue(requests.forall(_.version() == expectedStopReplicaRequestVersion))
+  }
+
+  private def applyStopReplicaResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = {
+    sentRequests.filter(_.responseCallback != null).foreach { sentRequest =>
+      val stopReplicaRequest = sentRequest.request.build().asInstanceOf[StopReplicaRequest]
+      val partitionErrorMap = stopReplicaRequest.partitions.asScala.map(_ -> error).toMap.asJava
+      val stopReplicaResponse = new StopReplicaResponse(error, partitionErrorMap)
+      sentRequest.responseCallback.apply(stopReplicaResponse)
+    }
+  }
+
+  private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = {
+    sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest =>
+      val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
+      val partitionErrorMap = leaderAndIsrRequest.partitionStates.asScala.keySet.map(_ -> error).toMap.asJava
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(error, partitionErrorMap)
+      sentRequest.responseCallback.apply(leaderAndIsrResponse)
+    }
+  }
+
+  private def createConfig(interBrokerVersion: ApiVersion): KafkaConfig = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, controllerId.toString)
+    props.put(KafkaConfig.ZkConnectProp, "zkConnect")
+    props.put(KafkaConfig.InterBrokerProtocolVersionProp, ApiVersion.latestVersion.version)
+    KafkaConfig.fromProps(props)
+  }
+
+  private def initContext(brokers: Seq[Int],
+                          topics: Set[String],
+                          numPartitions: Int,
+                          replicationFactor: Int): ControllerContext = {
+    val context = new ControllerContext
+    val brokerEpochs = brokers.map { brokerId =>
+      val endpoint = new EndPoint("localhost", 9900 + brokerId, new ListenerName("PLAINTEXT"),
+        SecurityProtocol.PLAINTEXT)
+      Broker(brokerId, Seq(endpoint), rack = None) -> 1L
+    }.toMap
+
+    context.setLiveBrokerAndEpochs(brokerEpochs)
+
+    // Simple round-robin replica assignment
+    var leaderIndex = 0
+    for (topic <- topics; partitionId <- 0 until numPartitions) {
+      val partition = new TopicPartition(topic, partitionId)
+      val replicas = (0 until replicationFactor).map { i =>
+        val replica = brokers((i + leaderIndex) % brokers.size)
+        replica
+      }
+      context.updatePartitionReplicaAssignment(partition, replicas)
+      leaderIndex += 1
+    }
+    context
+  }
+
+  private case class SentRequest(request: ControlRequest, responseCallback: AbstractResponse => Unit)
+
+  private class MockControllerBrokerRequestBatch(context: ControllerContext, config: KafkaConfig = config)
+    extends AbstractControllerBrokerRequestBatch(config, context, logger) {
+
+    val sentEvents = ListBuffer.empty[ControllerEvent]
+    val sentRequests = mutable.Map.empty[Int, ListBuffer[SentRequest]]
+
+    override def sendEvent(event: ControllerEvent): Unit = {
+      sentEvents.append(event)
+    }
+    override def sendRequest(brokerId: Int, request: ControlRequest, callback: AbstractResponse => Unit): Unit = {
+      sentRequests.getOrElseUpdate(brokerId, ListBuffer.empty)
+      sentRequests(brokerId).append(SentRequest(request, callback))
+    }
+
+    def collectStopReplicRequestsFor(brokerId: Int,
+                                     version: Short = ApiKeys.STOP_REPLICA.latestVersion): List[StopReplicaRequest] = {
+      sentRequests.get(brokerId) match {
+        case Some(requests) => requests
+          .filter(_.request.apiKey == ApiKeys.STOP_REPLICA)
+          .map(_.request.build(version).asInstanceOf[StopReplicaRequest]).toList
+        case None => List.empty[StopReplicaRequest]
+      }
+    }
+
+    def collectUpdateMetadataRequestsFor(brokerId: Int,
+                                         version: Short = ApiKeys.UPDATE_METADATA.latestVersion): List[UpdateMetadataRequest] = {
+      sentRequests.get(brokerId) match {
+        case Some(requests) => requests
+          .filter(_.request.apiKey == ApiKeys.UPDATE_METADATA)
+          .map(_.request.build(version).asInstanceOf[UpdateMetadataRequest]).toList
+        case None => List.empty[UpdateMetadataRequest]
+      }
+    }
+
+    def collectLeaderAndIsrRequestsFor(brokerId: Int,
+                                       version: Short = ApiKeys.LEADER_AND_ISR.latestVersion): List[LeaderAndIsrRequest] = {
+      sentRequests.get(brokerId) match {
+        case Some(requests) => requests
+          .filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR)
+          .map(_.request.build(version).asInstanceOf[LeaderAndIsrRequest]).toList
+        case None => List.empty[LeaderAndIsrRequest]
+      }
+    }
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index e0a753c..fef9bd1 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -21,8 +21,9 @@ import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicInteger
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.Timer
+import com.yammer.metrics.core.{Histogram, Timer}
 import kafka.utils.TestUtils
+import org.apache.kafka.common.utils.MockTime
 import org.junit.{After, Test}
 import org.junit.Assert.{assertEquals, fail}
 
@@ -39,36 +40,76 @@ class ControllerEventManagerTest {
   }
 
   @Test
+  def testEventQueueTime(): Unit = {
+    val controllerStats = new ControllerStats
+    val time = new MockTime()
+    val latch = new CountDownLatch(1)
+
+    val eventProcessor = new ControllerEventProcessor {
+      override def process(event: ControllerEvent): Unit = {
+        latch.await()
+        time.sleep(500)
+      }
+      override def preempt(event: ControllerEvent): Unit = {}
+    }
+
+    controllerEventManager = new ControllerEventManager(0, eventProcessor,
+      time, controllerStats.rateAndTimeMetrics)
+    controllerEventManager.start()
+
+    controllerEventManager.put(TopicChange)
+    controllerEventManager.put(TopicChange)
+    latch.countDown()
+
+    val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
+    val queueTimeHistogram = Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption
+      .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Histogram]
+
+    TestUtils.waitUntilTrue(() => controllerEventManager.isEmpty,
+      "Timed out waiting for processing of all events")
+
+    assertEquals(2, queueTimeHistogram.count)
+    assertEquals(0, queueTimeHistogram.min, 0.01)
+    assertEquals(500, queueTimeHistogram.max, 0.01)
+  }
+
+  @Test
   def testSuccessfulEvent(): Unit = {
-    check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs", ControllerState.AutoLeaderBalance,
-      () => Unit)
+    check("kafka.controller:type=ControllerStats,name=AutoLeaderBalanceRateAndTimeMs",
+      AutoPreferredReplicaLeaderElection, () => Unit)
   }
 
   @Test
   def testEventThatThrowsException(): Unit = {
-    check("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs", ControllerState.BrokerChange,
-      () => throw new NullPointerException)
+    check("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs",
+      BrokerChange, () => throw new NullPointerException)
   }
 
-  private def check(metricName: String, controllerState: ControllerState, process: () => Unit): Unit = {
+  private def check(metricName: String,
+                    event: ControllerEvent,
+                    func: () => Unit): Unit = {
     val controllerStats = new ControllerStats
     val eventProcessedListenerCount = new AtomicInteger
-    controllerEventManager = new ControllerEventManager(0, controllerStats.rateAndTimeMetrics,
-      _ => eventProcessedListenerCount.incrementAndGet, () => ())
+    val latch = new CountDownLatch(1)
+    val eventProcessor = new ControllerEventProcessor {
+      override def process(event: ControllerEvent): Unit = {
+        // Only return from `process()` once we have checked `controllerEventManager.state`
+        latch.await()
+        eventProcessedListenerCount.incrementAndGet()
+        func()
+      }
+      override def preempt(event: ControllerEvent): Unit = {}
+    }
+
+    controllerEventManager = new ControllerEventManager(0, eventProcessor,
+      new MockTime(), controllerStats.rateAndTimeMetrics)
     controllerEventManager.start()
 
     val initialTimerCount = timer(metricName).count
 
-    // Only return from `process()` once we have checked `controllerEventManager.state`
-    val latch = new CountDownLatch(1)
-    val eventMock = ControllerTestUtils.createMockControllerEvent(controllerState, { () =>
-      latch.await()
-      process()
-    })
-
-    controllerEventManager.put(eventMock)
-    TestUtils.waitUntilTrue(() => controllerEventManager.state == controllerState,
-      s"Controller state is not $controllerState")
+    controllerEventManager.put(event)
+    TestUtils.waitUntilTrue(() => controllerEventManager.state == event.state,
+      s"Controller state is not ${event.state}")
     latch.countDown()
 
     TestUtils.waitUntilTrue(() => controllerEventManager.state == ControllerState.Idle,
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 44bbca3..e19c5d4 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -19,6 +19,7 @@ package kafka.controller
 
 import java.util.Properties
 import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicReference
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
@@ -68,20 +69,22 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
 
     // Wait until we have verified that we have resigned
     val latch = new CountDownLatch(1)
-    @volatile var exceptionThrown: Option[Throwable] = None
-    val illegalStateEvent = ControllerTestUtils.createMockControllerEvent(ControllerState.BrokerChange, { () =>
-      try initialController.handleIllegalState(new IllegalStateException("Thrown for test purposes"))
-      catch {
-        case t: Throwable => exceptionThrown = Some(t)
+    val exceptionThrown = new AtomicReference[Throwable]()
+    val illegalStateEvent = new MockEvent(ControllerState.BrokerChange) {
+      override def process(): Unit = {
+        try initialController.handleIllegalState(new IllegalStateException("Thrown for test purposes"))
+        catch {
+          case t: Throwable => exceptionThrown.set(t)
+        }
+        latch.await()
       }
-      latch.await()
-    })
+    }
     initialController.eventManager.put(illegalStateEvent)
     // Check that we have shutdown the scheduler (via onControllerResigned)
     TestUtils.waitUntilTrue(() => !initialController.kafkaScheduler.isStarted, "Scheduler was not shutdown")
     TestUtils.waitUntilTrue(() => !initialController.isActive, "Controller did not become inactive")
     latch.countDown()
-    TestUtils.waitUntilTrue(() => exceptionThrown.isDefined, "handleIllegalState did not throw an exception")
+    TestUtils.waitUntilTrue(() => Option(exceptionThrown.get()).isDefined, "handleIllegalState did not throw an exception")
     assertTrue(s"handleIllegalState should throw an IllegalStateException, but $exceptionThrown was thrown",
       exceptionThrown.get.isInstanceOf[IllegalStateException])
 
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index ebb9b4a..4ecabf5 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -37,7 +37,7 @@ import org.scalatest.Assertions.fail
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
   var servers = Seq.empty[KafkaServer]
@@ -413,7 +413,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
     assertEquals(List(0,1), partitionStateInfo.basePartitionState.isr.asScala)
     controller.controlledShutdown(1, servers.find(_.config.brokerId == 1).get.kafkaController.brokerEpoch, controlledShutdownCallback)
-    partitionsRemaining = resultQueue.take().get
+    partitionsRemaining = resultQueue.take() match {
+      case Success(partitions) => partitions
+      case Failure(exception) => fail("Controlled shutdown failed due to error", exception)
+    }
     assertEquals(0, partitionsRemaining.size)
     activeServers = servers.filter(s => s.config.brokerId == 0)
     partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
@@ -517,7 +520,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
 
     // Let the controller event thread await on a latch until broker bounce finishes.
     // This is used to simulate fast broker bounce
-    controller.eventManager.put(KafkaController.AwaitOnLatch(latch))
+
+    controller.eventManager.put(new MockEvent(ControllerState.TopicChange) {
+      override def process(): Unit = latch.await()
+    })
 
     otherBroker.shutdown()
     otherBroker.startup()
@@ -536,7 +542,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   private def testControllerMove(fun: () => Unit): Unit = {
     val controller = getController().kafkaController
     val appender = LogCaptureAppender.createAndRegister()
-    val previousLevel = LogCaptureAppender.setClassLoggerLevel(controller.eventManager.thread.getClass, Level.INFO)
+    val previousLevel = LogCaptureAppender.setClassLoggerLevel(controller.getClass, Level.INFO)
 
     try {
       TestUtils.waitUntilTrue(() => {
@@ -547,7 +553,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
 
       // Let the controller event thread await on a latch before the pre-defined logic is triggered.
       // This is used to make sure that when the event thread resumes and starts processing events, the controller has already moved.
-      controller.eventManager.put(KafkaController.AwaitOnLatch(latch))
+      controller.eventManager.put(new MockEvent(ControllerState.TopicChange) {
+        override def process(): Unit = latch.await()
+      })
+
       // Execute pre-defined logic. This can be topic creation/deletion, preferred leader election, etc.
       fun()
 
@@ -560,9 +569,11 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       TestUtils.waitUntilTrue(() => !controller.isActive, "Controller fails to resign")
 
       // Expect to capture the ControllerMovedException in the log of ControllerEventThread
+      println(appender.getMessages.find(e => e.getLevel == Level.INFO
+        && e.getThrowableInformation != null))
       val event = appender.getMessages.find(e => e.getLevel == Level.INFO
         && e.getThrowableInformation != null
-        && e.getThrowableInformation.getThrowable.getClass.getName.equals(new ControllerMovedException("").getClass.getName))
+        && e.getThrowableInformation.getThrowable.getClass.getName.equals(classOf[ControllerMovedException].getName))
       assertTrue(event.isDefined)
 
     } finally {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
deleted file mode 100644
index 84b956d..0000000
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.controller
-
-import org.easymock.{EasyMock, IAnswer}
-
-object ControllerTestUtils {
-
-  /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
-  def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
-    val mockEvent: ControllerEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
-    EasyMock.expect(mockEvent.state).andReturn(controllerState)
-    EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
-      def answer(): Unit = {
-        process()
-      }
-    })
-    EasyMock.replay(mockEvent)
-    mockEvent
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
index e6297c0..e3e1996 100644
--- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
@@ -33,6 +33,30 @@ class TopicDeletionManagerTest {
   private val deletionClient = mock(classOf[DeletionClient])
 
   @Test
+  def testInitialization(): Unit = {
+    val controllerContext = initContext(
+      brokers = Seq(1, 2, 3),
+      topics = Set("foo", "bar", "baz"),
+      numPartitions = 2,
+      replicationFactor = 3)
+
+    val replicaStateMachine = new MockReplicaStateMachine(controllerContext)
+    replicaStateMachine.startup()
+
+    val partitionStateMachine = new MockPartitionStateMachine(controllerContext, uncleanLeaderElectionEnabled = false)
+    partitionStateMachine.startup()
+
+    val deletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
+      partitionStateMachine, deletionClient)
+
+    assertTrue(deletionManager.isDeleteTopicEnabled)
+    deletionManager.init(initialTopicsToBeDeleted = Set("foo", "bar"), initialTopicsIneligibleForDeletion = Set("bar", "baz"))
+
+    assertEquals(Set("foo", "bar"), controllerContext.topicsToBeDeleted.toSet)
+    assertEquals(Set("bar"), controllerContext.topicsIneligibleForDeletion.toSet)
+  }
+
+  @Test
   def testBasicDeletion(): Unit = {
     val controllerContext = initContext(
       brokers = Seq(1, 2, 3),
diff --git a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
index d35b997..23ac2dc 100755
--- a/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerEpochIntegrationTest.scala
@@ -143,10 +143,10 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
           partitionStates.asJava, nodes.toSet.asJava)
 
         if (isEpochInRequestStale) {
-          sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, ApiKeys.LEADER_AND_ISR, requestBuilder)
+          sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
         }
         else {
-          sendAndVerifySuccessfulResponse(controllerChannelManager, ApiKeys.LEADER_AND_ISR, requestBuilder)
+          sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
           TestUtils.waitUntilLeaderIsKnown(Seq(broker2), tp, 10000)
         }
       }
@@ -172,10 +172,10 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
           partitionStates.asJava, liverBrokers.toSet.asJava)
 
         if (isEpochInRequestStale) {
-          sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, ApiKeys.UPDATE_METADATA, requestBuilder)
+          sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
         }
         else {
-          sendAndVerifySuccessfulResponse(controllerChannelManager, ApiKeys.UPDATE_METADATA, requestBuilder)
+          sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
           TestUtils.waitUntilMetadataIsPropagated(Seq(broker2), tp.topic(), tp.partition(), 10000)
           assertEquals(brokerId2,
             broker2.metadataCache.getPartitionInfo(tp.topic(), tp.partition()).get.basePartitionState.leader)
@@ -190,10 +190,10 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
           true, Set(tp).asJava)
 
         if (isEpochInRequestStale) {
-          sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, ApiKeys.STOP_REPLICA, requestBuilder)
+          sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager, requestBuilder)
         }
         else {
-          sendAndVerifySuccessfulResponse(controllerChannelManager, ApiKeys.STOP_REPLICA, requestBuilder)
+          sendAndVerifySuccessfulResponse(controllerChannelManager, requestBuilder)
           assertTrue(broker2.replicaManager.getPartition(tp).isEmpty)
         }
       }
@@ -221,22 +221,23 @@ class BrokerEpochIntegrationTest extends ZooKeeperTestHarness {
     }, "Broker epoch mismatches")
   }
 
-  private def sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager: ControllerChannelManager, apiKeys: ApiKeys,
-                                               builder: AbstractControlRequest.Builder[_ <: AbstractControlRequest]): Unit = {
+  private def sendAndVerifyStaleBrokerEpochInResponse(controllerChannelManager: ControllerChannelManager,
+                                                      builder: AbstractControlRequest.Builder[_ <: AbstractControlRequest]): Unit = {
     var staleBrokerEpochDetected = false
-    controllerChannelManager.sendRequest(brokerId2, apiKeys, builder,
-      response => {staleBrokerEpochDetected = response.errorCounts().containsKey(Errors.STALE_BROKER_EPOCH)})
+    controllerChannelManager.sendRequest(brokerId2, builder, response => {
+      staleBrokerEpochDetected = response.errorCounts().containsKey(Errors.STALE_BROKER_EPOCH)
+    })
     TestUtils.waitUntilTrue(() => staleBrokerEpochDetected, "Broker epoch should be stale")
     assertTrue("Stale broker epoch not detected by the broker", staleBrokerEpochDetected)
   }
 
-  private def sendAndVerifySuccessfulResponse(controllerChannelManager: ControllerChannelManager, apiKeys: ApiKeys,
+  private def sendAndVerifySuccessfulResponse(controllerChannelManager: ControllerChannelManager,
                                               builder: AbstractControlRequest.Builder[_ <: AbstractControlRequest]): Unit = {
     @volatile var succeed = false
-    controllerChannelManager.sendRequest(brokerId2, apiKeys, builder,
-      response => {
+    controllerChannelManager.sendRequest(brokerId2, builder, response => {
         succeed = response.errorCounts().isEmpty ||
-          (response.errorCounts().containsKey(Errors.NONE) && response.errorCounts().size() == 1)})
+          (response.errorCounts().containsKey(Errors.NONE) && response.errorCounts().size() == 1)
+    })
     TestUtils.waitUntilTrue(() => succeed, "Should receive response with no errors")
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 1772f32..280a5dc 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -153,8 +153,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       val requestBuilder = new LeaderAndIsrRequest.Builder(
         ApiKeys.LEADER_AND_ISR.latestVersion, controllerId, staleControllerEpoch, servers(brokerId2).kafkaController.brokerEpoch ,partitionStates.asJava, nodes.toSet.asJava)
 
-      controllerChannelManager.sendRequest(brokerId2, ApiKeys.LEADER_AND_ISR, requestBuilder,
-        staleControllerEpochCallback)
+      controllerChannelManager.sendRequest(brokerId2, requestBuilder, staleControllerEpochCallback)
       TestUtils.waitUntilTrue(() => staleControllerEpochDetected, "Controller epoch should be stale")
       assertTrue("Stale controller epoch not detected by the broker", staleControllerEpochDetected)
     } finally {
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 7dddaab..f82801e 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -234,7 +234,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
       // Initiate a sendRequest and wait until connection is established and one byte is received by the peer
       val requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion,
         controllerId, 1, 0L, Map.empty.asJava, brokerAndEpochs.keys.map(_.node(listenerName)).toSet.asJava)
-      controllerChannelManager.sendRequest(1, ApiKeys.LEADER_AND_ISR, requestBuilder)
+      controllerChannelManager.sendRequest(1, requestBuilder)
       receiveFuture.get(10, TimeUnit.SECONDS)
 
       // Shutdown controller. Request timeout is 30s, verify that shutdown completed well before that