You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/27 15:51:29 UTC

[2/2] kafka git commit: KAFKA-5028; convert kafka controller to a single-threaded event queue model

KAFKA-5028; convert kafka controller to a single-threaded event queue model

The goal of this ticket is to improve controller maintainability by simplifying the controller's concurrency semantics. The controller code has a lot of shared state between several threads using several concurrency primitives. This makes the code hard to reason about.

This ticket proposes we convert the controller to a single-threaded event queue model. We add a new controller thread which processes events held in an event queue. Note that this does not mean we get rid of all threads used by the controller. We merely delegate all work that interacts with controller local state to this single thread. With only a single thread accessing and modifying the controller local state, we no longer need to worry about concurrent access, which means we can get rid of the various concurrency primitives used throughout the controller.

Performance is expected to match existing behavior since the bulk of the existing controller work today already happens sequentially in the ZkClient\u2019s single ZkEventThread.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2816 from onurkaraman/KAFKA-5028


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb663d04
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb663d04
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb663d04

Branch: refs/heads/trunk
Commit: bb663d04febcadd4f120e0ff5c5919ca8bf7e971
Parents: f69d941
Author: Onur Karaman <ok...@linkedin.com>
Authored: Thu Apr 27 08:51:23 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Apr 27 08:51:23 2017 -0700

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   |   80 +-
 .../kafka/controller/ControllerZkListener.scala |   62 -
 .../kafka/controller/KafkaController.scala      | 1093 +++++++++++-------
 .../controller/PartitionLeaderSelector.scala    |    4 +-
 .../controller/PartitionStateMachine.scala      |  193 +---
 .../kafka/controller/ReplicaStateMachine.scala  |   87 +-
 .../kafka/controller/TopicDeletionManager.scala |  220 ++--
 .../src/main/scala/kafka/server/KafkaApis.scala |   21 +-
 .../main/scala/kafka/server/LeaderElector.scala |   33 -
 .../kafka/server/ZookeeperLeaderElector.scala   |  158 ---
 .../test/scala/unit/kafka/admin/AdminTest.scala |   15 +-
 11 files changed, 831 insertions(+), 1135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb663d04/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 102ebd8..c5a9678 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -28,16 +28,15 @@ import org.apache.kafka.clients._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.requests
-import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
+import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.{Node, TopicPartition, requests}
 
 import scala.collection.JavaConverters._
-import scala.collection.{Set, mutable}
 import scala.collection.mutable.HashMap
+import scala.collection.{Set, mutable}
 
 class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging {
   protected val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -171,7 +170,6 @@ class RequestSendThread(val controllerId: Int,
                         name: String)
   extends ShutdownableThread(name = name) {
 
-  private val lock = new Object()
   private val stateChangeLogger = KafkaController.stateChangeLogger
   private val socketTimeoutMs = config.controllerSocketTimeoutMs
 
@@ -182,45 +180,43 @@ class RequestSendThread(val controllerId: Int,
     val QueueItem(apiKey, requestBuilder, callback) = queue.take()
     var clientResponse: ClientResponse = null
     try {
-      lock synchronized {
-        var isSendSuccessful = false
-        while (isRunning.get() && !isSendSuccessful) {
-          // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
-          // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
-          try {
-            if (!brokerReady()) {
-              isSendSuccessful = false
-              backoff()
-            }
-            else {
-              val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
-                time.milliseconds(), true)
-              clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
-              isSendSuccessful = true
-            }
-          } catch {
-            case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
-              warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
-                "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
-                  requestBuilder.toString, brokerNode.toString), e)
-              networkClient.close(brokerNode.idString)
-              isSendSuccessful = false
-              backoff()
+      var isSendSuccessful = false
+      while (isRunning.get() && !isSendSuccessful) {
+        // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
+        // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
+        try {
+          if (!brokerReady()) {
+            isSendSuccessful = false
+            backoff()
+          }
+          else {
+            val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
+              time.milliseconds(), true)
+            clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
+            isSendSuccessful = true
           }
+        } catch {
+          case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
+            warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
+              "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
+                requestBuilder.toString, brokerNode.toString), e)
+            networkClient.close(brokerNode.idString)
+            isSendSuccessful = false
+            backoff()
         }
-        if (clientResponse != null) {
-          val api = ApiKeys.forId(clientResponse.requestHeader.apiKey)
-          if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY)
-            throw new KafkaException(s"Unexpected apiKey received: $apiKey")
+      }
+      if (clientResponse != null) {
+        val api = ApiKeys.forId(clientResponse.requestHeader.apiKey)
+        if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY)
+          throw new KafkaException(s"Unexpected apiKey received: $apiKey")
 
-          val response = clientResponse.responseBody
+        val response = clientResponse.responseBody
 
-          stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
-            .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
+        stateChangeLogger.trace("Controller %d epoch %d received response %s for a request sent to broker %s"
+          .format(controllerId, controllerContext.epoch, response.toString, brokerNode.toString))
 
-          if (callback != null) {
-            callback(response)
-          }
+        if (callback != null) {
+          callback(response)
         }
       }
     } catch {
@@ -339,15 +335,15 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
         controllerContext.partitionLeadershipInfo.keySet
       else
         partitions
-      if (controller.deleteTopicManager.partitionsToBeDeleted.isEmpty)
+      if (controller.topicDeletionManager.partitionsToBeDeleted.isEmpty)
         givenPartitions
       else
-        givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
+        givenPartitions -- controller.topicDeletionManager.partitionsToBeDeleted
     }
 
     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
     filteredPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = false))
-    controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
+    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb663d04/core/src/main/scala/kafka/controller/ControllerZkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerZkListener.scala b/core/src/main/scala/kafka/controller/ControllerZkListener.scala
deleted file mode 100644
index f7557ed..0000000
--- a/core/src/main/scala/kafka/controller/ControllerZkListener.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.controller
-
-import kafka.utils.Logging
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener}
-
-import scala.collection.JavaConverters._
-
-trait ControllerZkListener extends Logging {
-  logIdent = s"[$logName on Controller " + controller.config.brokerId + "]: "
-  protected def logName: String
-  protected def controller: KafkaController
-}
-
-trait ControllerZkChildListener extends IZkChildListener with ControllerZkListener {
-  @throws[Exception]
-  final def handleChildChange(parentPath: String, currentChildren: java.util.List[String]): Unit = {
-    // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
-    if (controller.isActive)
-      doHandleChildChange(parentPath, currentChildren.asScala)
-  }
-
-  @throws[Exception]
-  def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit
-}
-
-trait ControllerZkDataListener extends IZkDataListener with ControllerZkListener {
-  @throws[Exception]
-  final def handleDataChange(dataPath: String, data: AnyRef): Unit = {
-    // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
-    if (controller.isActive)
-      doHandleDataChange(dataPath, data)
-  }
-
-  @throws[Exception]
-  def doHandleDataChange(dataPath: String, data: AnyRef): Unit
-
-  @throws[Exception]
-  final def handleDataDeleted(dataPath: String): Unit = {
-    // Due to zkclient's callback order, it's possible for the callback to be triggered after the controller has moved
-    if (controller.isActive)
-      doHandleDataDeleted(dataPath)
-  }
-
-  @throws[Exception]
-  def doHandleDataDeleted(dataPath: String): Unit
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb663d04/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9f8def1..b7c2ae4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,39 +16,34 @@
  */
 package kafka.controller
 
-import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse}
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
 
-import scala.collection._
 import com.yammer.metrics.core.{Gauge, Meter}
-import java.util.concurrent.TimeUnit
-
-import kafka.admin.AdminUtils
-import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common._
+import kafka.common.{TopicAndPartition, _}
 import kafka.log.LogConfig
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.server._
 import kafka.utils.ZkUtils._
 import kafka.utils._
-import kafka.utils.CoreUtils._
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse}
 import org.apache.kafka.common.utils.Time
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
-import java.util.concurrent.locks.ReentrantLock
+import org.apache.zookeeper.Watcher.Event.KeeperState
 
-import kafka.server._
-import kafka.common.TopicAndPartition
+import scala.collection._
+import scala.util.Try
 
 class ControllerContext(val zkUtils: ZkUtils) {
   var controllerChannelManager: ControllerChannelManager = null
-  val controllerLock: ReentrantLock = new ReentrantLock()
   var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
-  val brokerShutdownLock: Object = new Object
   var epoch: Int = KafkaController.InitialControllerEpoch - 1
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
   var allTopics: Set[String] = Set.empty
@@ -152,26 +147,32 @@ object KafkaController extends Logging {
 
 class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
-  private var isRunning = true
   private val stateChangeLogger = KafkaController.stateChangeLogger
   val controllerContext = new ControllerContext(zkUtils)
   val partitionStateMachine = new PartitionStateMachine(this)
   val replicaStateMachine = new ReplicaStateMachine(this)
-  private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
-    onControllerResignation, config.brokerId, time)
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
-  private val autoRebalanceScheduler = new KafkaScheduler(1)
-  var deleteTopicManager: TopicDeletionManager = null
+  private val kafkaScheduler = new KafkaScheduler(1)
+  var topicDeletionManager: TopicDeletionManager = null
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
 
-  private val partitionReassignedListener = new PartitionsReassignedListener(this)
+  private val controllerEventQueue = new LinkedBlockingQueue[ControllerEvent]
+  private val controllerEventThread = new ControllerEventThread("controller-event-thread")
+  private val brokerChangeListener = new BrokerChangeListener(this)
+  private val topicChangeListener = new TopicChangeListener(this)
+  private val topicDeletionListener = new TopicDeletionListener(this)
+  private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
+  private val partitionReassignmentListener = new PartitionReassignmentListener(this)
   private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
   private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
+  private val activeControllerId = new AtomicInteger(-1)
+  private val offlinePartitionCount = new AtomicInteger(0)
+  private val preferredReplicaImbalanceCount = new AtomicInteger(0)
 
   newGauge(
     "ActiveControllerCount",
@@ -184,15 +185,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     "OfflinePartitionsCount",
     new Gauge[Int] {
       def value(): Int = {
-        inLock(controllerContext.controllerLock) {
-          if (!isActive)
-            0
-          else
-            controllerContext.partitionLeadershipInfo.count(p => 
-              (!controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader))
-              && (!deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic))
-            )
-        }
+        offlinePartitionCount.get()
       }
     }
   )
@@ -201,17 +194,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     "PreferredReplicaImbalanceCount",
     new Gauge[Int] {
       def value(): Int = {
-        inLock(controllerContext.controllerLock) {
-          if (!isActive)
-            0
-          else
-            controllerContext.partitionReplicaAssignment.count {
-              case (topicPartition, replicas) => 
-                (controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head 
-                && (!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic))
-                )
-            }
-        }
+        preferredReplicaImbalanceCount.get()
       }
     }
   )
@@ -232,76 +215,9 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    * @param id Id of the broker to shutdown.
    * @return The number of partitions that the broker still leads.
    */
-  def shutdownBroker(id: Int): Set[TopicAndPartition] = {
-
-    if (!isActive) {
-      throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
-    }
-
-    controllerContext.brokerShutdownLock synchronized {
-      info("Shutting down broker " + id)
-
-      inLock(controllerContext.controllerLock) {
-        if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
-          throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
-
-        controllerContext.shuttingDownBrokerIds.add(id)
-        debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
-        debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
-      }
-
-      val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
-        inLock(controllerContext.controllerLock) {
-          controllerContext.partitionsOnBroker(id)
-            .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
-        }
-
-      allPartitionsAndReplicationFactorOnBroker.foreach {
-        case(topicAndPartition, replicationFactor) =>
-          // Move leadership serially to relinquish lock.
-          inLock(controllerContext.controllerLock) {
-            controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
-              if (replicationFactor > 1) {
-                if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
-                  // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
-                  // notifies all affected brokers
-                  partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                    controlledShutdownPartitionLeaderSelector)
-                } else {
-                  // Stop the replica first. The state change below initiates ZK changes which should take some time
-                  // before which the stop replica request should be completed (in most cases)
-                  try {
-                    brokerRequestBatch.newBatch()
-                    brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                      topicAndPartition.partition, deletePartition = false)
-                    brokerRequestBatch.sendRequestsToBrokers(epoch)
-                  } catch {
-                    case e : IllegalStateException => {
-                      // Resign if the controller is in an illegal state
-                      error("Forcing the controller to resign")
-                      brokerRequestBatch.clear()
-                      controllerElector.resign()
-
-                      throw e
-                    }
-                  }
-                  // If the broker is a follower, updates the isr in ZK and notifies the current leader
-                  replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                    topicAndPartition.partition, id)), OfflineReplica)
-                }
-              }
-            }
-          }
-      }
-      def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
-        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
-        controllerContext.partitionLeadershipInfo.filter {
-          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
-        }.keys
-      }
-      replicatedPartitionsBrokerLeads().toSet
-    }
+  def shutdownBroker(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit): Unit = {
+    val controlledShutdownEvent = ControlledShutdown(id, controlledShutdownCallback)
+    addToControllerEventQueue(controlledShutdownEvent)
   }
 
   /**
@@ -318,44 +234,45 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
   def onControllerFailover() {
-    if(isRunning) {
-      info("Broker %d starting become controller state transition".format(config.brokerId))
-      readControllerEpochFromZookeeper()
-      incrementControllerEpoch(zkUtils.zkClient)
-
-      // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
-      registerReassignedPartitionsListener()
-      registerIsrChangeNotificationListener()
-      registerPreferredReplicaElectionListener()
-      partitionStateMachine.registerListeners()
-      replicaStateMachine.registerListeners()
-
-      initializeControllerContext()
-
-      // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
-      // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
-      // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
-      // partitionStateMachine.startup().
-      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+    info("Broker %d starting become controller state transition".format(config.brokerId))
+    readControllerEpochFromZookeeper()
+    incrementControllerEpoch(zkUtils.zkClient)
+
+    // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
+    registerPartitionReassignmentListener()
+    registerIsrChangeNotificationListener()
+    registerPreferredReplicaElectionListener()
+    registerTopicChangeListener()
+    registerTopicDeletionListener()
+    registerBrokerChangeListener()
+
+    initializeControllerContext()
+
+    // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
+    // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
+    // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
+    // partitionStateMachine.startup().
+    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
 
-      replicaStateMachine.startup()
-      partitionStateMachine.startup()
-
-      // register the partition change listeners for all existing topics on failover
-      controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
-      info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
-      maybeTriggerPartitionReassignment()
-      maybeTriggerPreferredReplicaElection()
-      if (config.autoLeaderRebalanceEnable) {
-        info("starting the partition rebalance scheduler")
-        autoRebalanceScheduler.startup()
-        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
-          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
-      }
-      deleteTopicManager.start()
+    replicaStateMachine.startup()
+    partitionStateMachine.startup()
+
+    // register the partition change listeners for all existing topics on failover
+    controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
+    info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
+    maybeTriggerPartitionReassignment()
+    maybeTriggerPreferredReplicaElection()
+    info("starting the controller scheduler")
+    kafkaScheduler.startup()
+    if (config.autoLeaderRebalanceEnable) {
+      scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
     }
-    else
-      info("Controller has been shut down, aborting startup/failover")
+    topicDeletionManager.start()
+  }
+
+  private def scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit = {
+    kafkaScheduler.schedule("auto-leader-rebalance-task", () => addToControllerEventQueue(AutoPreferredReplicaLeaderElection),
+      delay = delay, unit = unit)
   }
 
   /**
@@ -367,46 +284,46 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     debug("Controller resigning, broker id %d".format(config.brokerId))
     // de-register listeners
     deregisterIsrChangeNotificationListener()
-    deregisterReassignedPartitionsListener()
+    deregisterPartitionReassignmentListener()
     deregisterPreferredReplicaElectionListener()
 
     // shutdown delete topic manager
-    if (deleteTopicManager != null)
-      deleteTopicManager.shutdown()
+    if (topicDeletionManager != null)
+      topicDeletionManager.shutdown()
 
     // shutdown leader rebalance scheduler
-    if (config.autoLeaderRebalanceEnable)
-      autoRebalanceScheduler.shutdown()
-
-    inLock(controllerContext.controllerLock) {
-      // de-register partition ISR listener for on-going partition reassignment task
-      deregisterReassignedPartitionsIsrChangeListeners()
-      // shutdown partition state machine
-      partitionStateMachine.shutdown()
-      // shutdown replica state machine
-      replicaStateMachine.shutdown()
-      // shutdown controller channel manager
-      if(controllerContext.controllerChannelManager != null) {
-        controllerContext.controllerChannelManager.shutdown()
-        controllerContext.controllerChannelManager = null
-      }
-      // reset controller context
-      controllerContext.epoch=0
-      controllerContext.epochZkVersion=0
-      brokerState.newState(RunningAsBroker)
-
-      info("Broker %d resigned as the controller".format(config.brokerId))
+    kafkaScheduler.shutdown()
+    offlinePartitionCount.set(0)
+    preferredReplicaImbalanceCount.set(0)
+
+    // de-register partition ISR listener for on-going partition reassignment task
+    deregisterPartitionReassignmentIsrChangeListeners()
+    // shutdown partition state machine
+    partitionStateMachine.shutdown()
+    deregisterTopicChangeListener()
+    partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener)
+    deregisterTopicDeletionListener()
+    // shutdown replica state machine
+    replicaStateMachine.shutdown()
+    deregisterBrokerChangeListener()
+
+    // shutdown controller channel manager
+    if(controllerContext.controllerChannelManager != null) {
+      controllerContext.controllerChannelManager.shutdown()
+      controllerContext.controllerChannelManager = null
     }
+    // reset controller context
+    controllerContext.epoch=0
+    controllerContext.epochZkVersion=0
+    brokerState.newState(RunningAsBroker)
+
+    info("Broker %d resigned as the controller".format(config.brokerId))
   }
 
   /**
    * Returns true if this broker is the current controller.
    */
-  def isActive: Boolean = {
-    inLock(controllerContext.controllerLock) {
-      controllerContext.controllerChannelManager != null
-    }
-  }
+  def isActive: Boolean = activeControllerId.get() == config.brokerId
 
   /**
    * This callback is invoked by the replica state machine's broker change listener, with the list of newly started
@@ -444,12 +361,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
     // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
     // on the newly restarted brokers, there is a chance that topic deletion can resume
-    val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+    val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
     if(replicasForTopicsToBeDeleted.nonEmpty) {
       info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
         "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
-        deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
-      deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
+        topicDeletionManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
+      topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
     }
   }
 
@@ -474,22 +391,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
     val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
       deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
-        !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
+        !topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // filter out the replicas that belong to topics that are being deleted
     var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
-    val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+    val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
     // handle dead replicas
     replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
     // check if topic deletion state for the dead replicas needs to be updated
-    val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+    val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
     if(replicasForTopicsToBeDeleted.nonEmpty) {
       // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
       // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
       // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
-      deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
+      topicDeletionManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
     }
 
     // If broker failure did not require leader re-election, inform brokers of failed broker
@@ -509,7 +426,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
     info("New topic creation callback for %s".format(newPartitions.mkString(",")))
     // subscribe to partition changes
-    topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
+    topics.foreach(topic => registerPartitionModificationsListener(topic))
     onNewPartitionCreation(newPartitions)
   }
 
@@ -608,7 +525,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
       //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
       // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
-      deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
+      topicDeletionManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
     }
   }
 
@@ -616,8 +533,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
                                                     partition: Int,
                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition,
-      reassignedReplicas.toSet)
+    val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, topic, partition, reassignedReplicas.toSet)
     reassignedPartitionContext.isrChangeListener = isrChangeListener
     // register listener on the leader and isr path to wait until they catch up with the current leader
     zkUtils.zkClient.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
@@ -641,7 +557,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
             watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
             controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
             // mark topic ineligible for deletion for the partitions being reassigned
-            deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
+            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
             onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
           }
         case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
@@ -658,13 +574,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
-      deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
+      topicDeletionManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
       partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
       removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
-      deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
     }
   }
 
@@ -674,13 +589,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    * elector
    */
   def startup() = {
-    inLock(controllerContext.controllerLock) {
-      info("Controller starting up")
-      registerSessionExpirationListener()
-      isRunning = true
-      controllerElector.startup
-      info("Controller startup complete")
-    }
+    addToControllerEventQueue(Startup)
+    controllerEventThread.start()
   }
 
   /**
@@ -689,9 +599,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
    * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
    */
   def shutdown() = {
-    inLock(controllerContext.controllerLock) {
-      isRunning = false
-    }
+    controllerEventThread.shutdown()
     onControllerResignation()
   }
 
@@ -732,7 +640,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
   }
 
   private def registerSessionExpirationListener() = {
-    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener())
+    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this))
+  }
+
+  private def registerControllerChangeListener() = {
+    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this))
   }
 
   private def initializeControllerContext() {
@@ -803,7 +715,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
     info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
     // initialize the topic deletion manager
-    deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
+    topicDeletionManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
   }
 
   private def maybeTriggerPartitionReassignment() {
@@ -912,7 +824,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
             // Resign if the controller is in an illegal state
             error("Forcing the controller to resign")
             brokerRequestBatch.clear()
-            controllerElector.resign()
+            triggerControllerMove()
 
             throw e
           }
@@ -927,6 +839,48 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     }
   }
 
+  private def registerBrokerChangeListener() = {
+    zkUtils.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+  }
+
+  private def deregisterBrokerChangeListener() = {
+    zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+  }
+
+  private def registerTopicChangeListener() = {
+    zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
+  }
+
+  private def deregisterTopicChangeListener() = {
+    zkUtils.zkClient.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
+  }
+
+  def registerPartitionModificationsListener(topic: String) = {
+    partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, topic))
+    zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
+  }
+
+  def deregisterPartitionModificationsListener(topic: String) = {
+    zkUtils.zkClient.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
+    partitionModificationsListeners.remove(topic)
+  }
+
+  private def registerTopicDeletionListener() = {
+    zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
+  }
+
+  private def deregisterTopicDeletionListener() = {
+    zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
+  }
+
+  private def registerPartitionReassignmentListener() = {
+    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
+  }
+
+  private def deregisterPartitionReassignmentListener() = {
+    zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
+  }
+
   private def registerIsrChangeNotificationListener() = {
     debug("Registering IsrChangeNotificationListener")
     zkUtils.zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
@@ -937,14 +891,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     zkUtils.zkClient.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
   }
 
-  private def registerReassignedPartitionsListener() = {
-    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
-  }
-
-  private def deregisterReassignedPartitionsListener() = {
-    zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
-  }
-
   private def registerPreferredReplicaElectionListener() {
     zkUtils.zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
   }
@@ -953,7 +899,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     zkUtils.zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
   }
 
-  private def deregisterReassignedPartitionsIsrChangeListeners() {
+  private def deregisterPartitionReassignmentIsrChangeListeners() {
     controllerContext.partitionsBeingReassigned.foreach {
       case (topicAndPartition, reassignedPartitionsContext) =>
         val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
@@ -1033,7 +979,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
         // Resign if the controller is in an illegal state
         error("Forcing the controller to resign")
         brokerRequestBatch.clear()
-        controllerElector.resign()
+        triggerControllerMove()
 
         throw e
       }
@@ -1153,154 +1099,206 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     finalLeaderIsrAndControllerEpoch
   }
 
-  class SessionExpirationListener() extends IZkStateListener with Logging {
-    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
+  private def checkAndTriggerPartitionRebalance(): Unit = {
+    trace("checking need to trigger partition rebalance")
+    var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = controllerContext.partitionReplicaAssignment
+      .filterNot(p => topicDeletionManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
+        case (_, assignedReplicas) => assignedReplicas.head
+      }
+    debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
+    // for each broker, check if a preferred replica election needs to be triggered
+    preferredReplicasForTopicsByBrokers.foreach {
+      case(leaderBroker, topicAndPartitionsForBroker) => {
+        var imbalanceRatio: Double = 0
+        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = topicAndPartitionsForBroker
+          .filter { case (topicPartition, _) =>
+          controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
+            controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
+        }
+        debug("topics not in preferred replica " + topicsNotInPreferredReplica)
+        val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
+        val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
+        imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
+        trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
+        // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
+        // that need to be on this broker
+        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
+          topicsNotInPreferredReplica.keys.foreach { topicPartition =>
+            // do this check only if the broker is live and there are no partitions being reassigned currently
+            // and preferred replica election is not in progress
+            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
+                controllerContext.partitionsBeingReassigned.isEmpty &&
+                controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
+                !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+                controllerContext.allTopics.contains(topicPartition.topic)) {
+              onPreferredReplicaElection(Set(topicPartition), true)
+            }
+          }
+        }
+      }
+    }
+  }
 
-    def handleStateChanged(state: KeeperState) {
-      // do nothing, since zkclient will do reconnect for us.
+  def getControllerID(): Int = {
+    controllerContext.zkUtils.readDataMaybeNull(ZkUtils.ControllerPath)._1 match {
+      case Some(controller) => KafkaController.parseControllerId(controller)
+      case None => -1
     }
+  }
 
-    /**
-     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
-     * any ephemeral nodes here.
-     *
-     * @throws Exception On any error.
-     */
-    @throws[Exception]
-    def handleNewSession() {
-      info("ZK expired; shut down all controller components and try to re-elect")
-      if (controllerElector.getControllerID() != config.brokerId) {
-        onControllerResignation()
-        inLock(controllerContext.controllerLock) {
-          controllerElector.elect
+  def addToControllerEventQueue(controllerEvent: ControllerEvent): Unit = {
+    controllerEventQueue.put(controllerEvent)
+  }
+
+  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
+    override def doWork(): Unit = {
+      val controllerEvent = controllerEventQueue.take()
+      try {
+        controllerEvent.process()
+      } catch {
+        case e: Throwable => error("Error processing event " + controllerEvent, e)
+      }
+      updateMetrics()
+    }
+  }
+
+  case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      ControllerStats.leaderElectionTimer.time {
+        try {
+          val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
+          val curBrokerIds = curBrokers.map(_.id)
+          val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+          val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+          val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+          val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
+          controllerContext.liveBrokers = curBrokers
+          val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+          val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+          val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
+          info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
+            .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+          newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
+          deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
+          if(newBrokerIds.nonEmpty)
+            onBrokerStartup(newBrokerIdsSorted)
+          if(deadBrokerIds.nonEmpty)
+            onBrokerFailure(deadBrokerIdsSorted)
+        } catch {
+          case e: Throwable => error("Error while handling broker changes", e)
         }
-      } else {
-        // This can happen when there are multiple consecutive session expiration and handleNewSession() are called multiple
-        // times. The first call may already register the controller path using the newest ZK session. Therefore, the
-        // controller path will exist in subsequent calls to handleNewSession().
-        info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect".format(config.brokerId))
       }
     }
+  }
 
-    def handleSessionEstablishmentError(error: Throwable): Unit = {
-      //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
+  case class TopicChange(topics: Set[String]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      try {
+        val newTopics = topics -- controllerContext.allTopics
+        val deletedTopics = controllerContext.allTopics -- topics
+        controllerContext.allTopics = topics
+
+        val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
+        controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
+          !deletedTopics.contains(p._1.topic))
+        controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+        info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+          deletedTopics, addedPartitionReplicaAssignment))
+        if (newTopics.nonEmpty)
+          onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
+      } catch {
+        case e: Throwable => error("Error while handling new topic", e)
+      }
     }
   }
 
-  private def checkAndTriggerPartitionRebalance(): Unit = {
-    if (isActive) {
-      trace("checking need to trigger partition rebalance")
-      // get all the active brokers
-      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
-      inLock(controllerContext.controllerLock) {
-        preferredReplicasForTopicsByBrokers =
-          controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
-            case (_, assignedReplicas) => assignedReplicas.head
+  case class PartitionModifications(topic: String) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      try {
+        val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
+        val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
+          !controllerContext.partitionReplicaAssignment.contains(p._1))
+        if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+          error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+            .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+        else {
+          if (partitionsToBeAdded.nonEmpty) {
+            info("New partitions to be added %s".format(partitionsToBeAdded))
+            controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
+            onNewPartitionCreation(partitionsToBeAdded.keySet)
           }
+        }
+      } catch {
+        case e: Throwable => error("Error while handling add partitions for topic " + topic, e)
       }
-      debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
-      // for each broker, check if a preferred replica election needs to be triggered
-      preferredReplicasForTopicsByBrokers.foreach {
-        case(leaderBroker, topicAndPartitionsForBroker) => {
-          var imbalanceRatio: Double = 0
-          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
-          inLock(controllerContext.controllerLock) {
-            topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
-              controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
-                controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
-            }
-            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
-            val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
-            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
-            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
-            trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
-          }
-          // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
-          // that need to be on this broker
-          if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            topicsNotInPreferredReplica.keys.foreach { topicPartition =>
-              inLock(controllerContext.controllerLock) {
-                // do this check only if the broker is live and there are no partitions being reassigned currently
-                // and preferred replica election is not in progress
-                if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                    controllerContext.partitionsBeingReassigned.isEmpty &&
-                    controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
-                    !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
-                    controllerContext.allTopics.contains(topicPartition.topic)) {
-                  onPreferredReplicaElection(Set(topicPartition), true)
-                }
-              }
-            }
+    }
+  }
+
+  case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
+      val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
+      if (nonExistentTopics.nonEmpty) {
+        warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
+        nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
+      }
+      topicsToBeDeleted --= nonExistentTopics
+      if (config.deleteTopicEnable) {
+        if (topicsToBeDeleted.nonEmpty) {
+          info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
+          // mark topic ineligible for deletion if other state changes are in progress
+          topicsToBeDeleted.foreach { topic =>
+            val preferredReplicaElectionInProgress =
+              controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
+            val partitionReassignmentInProgress =
+              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+            if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
+              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
           }
+          // add topic to deletion list
+          topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
+        }
+      } else {
+        // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
+        for (topic <- topicsToBeDeleted) {
+          info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
+          zkUtils.zkClient.delete(getDeleteTopicPath(topic))
         }
       }
     }
   }
-}
 
-/**
- * Starts the partition reassignment process unless -
- * 1. Partition previously existed
- * 2. New replicas are the same as existing replicas
- * 3. Any replica in the new set of replicas are dead
- * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
- * partitions.
- */
-class PartitionsReassignedListener(protected val controller: KafkaController) extends ControllerZkDataListener {
-  private val controllerContext = controller.controllerContext
-
-  protected def logName = "PartitionsReassignedListener"
-
-  /**
-   * Invoked when some partitions are reassigned by the admin command
-   *
-   * @throws Exception On any error.
-   */
-  @throws[Exception]
-  def doHandleDataChange(dataPath: String, data: AnyRef) {
-    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
-      .format(dataPath, data))
-    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
-    val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
-      partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-    }
-    partitionsToBeReassigned.foreach { partitionToBeReassigned =>
-      inLock(controllerContext.controllerLock) {
-        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
+  case class PartitionReassignment(partitionReassignment: Map[TopicAndPartition, Seq[Int]]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+        if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
           error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
             .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
-          controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+          removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
         } else {
-          val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
-          controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+          val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
+          initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
         }
       }
+
     }
   }
 
-  def doHandleDataDeleted(dataPath: String) {}
-}
-
-class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaController, topic: String, partition: Int,
-                                            reassignedReplicas: Set[Int]) extends ControllerZkDataListener {
-  private val zkUtils = controller.controllerContext.zkUtils
-  private val controllerContext = controller.controllerContext
-
-  protected def logName = "ReassignedPartitionsIsrChangeListener"
-
-  /**
-   * Invoked when some partitions need to move leader to preferred replica
-   */
-  def doHandleDataChange(dataPath: String, data: AnyRef) {
-    inLock(controllerContext.controllerLock) {
-      debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
-      val topicAndPartition = TopicAndPartition(topic, partition)
+  case class PartitionReassignmentIsrChange(topicAndPartition: TopicAndPartition, reassignedReplicas: Set[Int]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
       try {
         // check if this partition is still being reassigned or not
         controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
           case Some(reassignedPartitionContext) =>
             // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-            val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
+            val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
             newLeaderAndIsrOpt match {
               case Some(leaderAndIsr) => // check if new replicas have joined ISR
                 val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
@@ -1309,7 +1307,7 @@ class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaContr
                   info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
                     "Resuming partition reassignment")
-                  controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
+                  onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
                 }
                 else {
                   info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
@@ -1327,69 +1325,340 @@ class ReassignedPartitionsIsrChangeListener(protected val controller: KafkaContr
     }
   }
 
-  def doHandleDataDeleted(dataPath: String) {}
-
-}
-
-/**
- * Called when leader intimates of isr change
- *
- * @param controller
- */
-class IsrChangeNotificationListener(protected val controller: KafkaController) extends ControllerZkChildListener {
-
-  protected def logName = "IsrChangeNotificationListener"
-
-  def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = {
-    inLock(controller.controllerContext.controllerLock) {
-      debug("ISR change notification listener fired")
+  case class IsrChangeNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
       try {
-        val topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet
+        val topicAndPartitions = sequenceNumbers.flatMap(getTopicAndPartition).toSet
         if (topicAndPartitions.nonEmpty) {
-          controller.updateLeaderAndIsrCache(topicAndPartitions)
+          updateLeaderAndIsrCache(topicAndPartitions)
           processUpdateNotifications(topicAndPartitions)
         }
       } finally {
-        // delete processed children
-        currentChildren.map(x => controller.controllerContext.zkUtils.deletePath(
-          ZkUtils.IsrChangeNotificationPath + "/" + x))
+        // delete the notifications
+        sequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x))
+      }
+    }
+
+    private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
+      val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
+      debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
+      sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
+    }
+
+    private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
+      val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
+      val (jsonOpt, _) = controllerContext.zkUtils.readDataMaybeNull(changeZnode)
+      if (jsonOpt.isDefined) {
+        val json = Json.parseFull(jsonOpt.get)
+
+        json match {
+          case Some(m) =>
+            val topicAndPartitions: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
+            val isrChanges = m.asInstanceOf[Map[String, Any]]
+            val topicAndPartitionList = isrChanges("partitions").asInstanceOf[List[Any]]
+            topicAndPartitionList.foreach {
+              case tp =>
+                val topicAndPartition = tp.asInstanceOf[Map[String, Any]]
+                val topic = topicAndPartition("topic").asInstanceOf[String]
+                val partition = topicAndPartition("partition").asInstanceOf[Int]
+                topicAndPartitions += TopicAndPartition(topic, partition)
+            }
+            topicAndPartitions
+          case None =>
+            error("Invalid topic and partition JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
+            Set.empty
+        }
+      } else {
+        Set.empty
+      }
+    }
+
+  }
+
+  case class PreferredReplicaLeaderElection(partitionsForPreferredReplicaElection: Set[TopicAndPartition]) extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      if (controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
+        info("These partitions are already undergoing preferred replica election: %s"
+          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+      val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
+      if (partitionsForTopicsToBeDeleted.nonEmpty) {
+        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
+          .format(partitionsForTopicsToBeDeleted))
       }
+      onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
     }
   }
 
-  private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
-    val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
-    debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
-    controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
+  case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
+    override def process(): Unit = {
+      if (!isActive) return
+      try {
+        checkAndTriggerPartitionRebalance()
+      } finally {
+        scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+      }
+    }
   }
 
-  private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
-    val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
-    val (jsonOpt, _) = controller.controllerContext.zkUtils.readDataMaybeNull(changeZnode)
-    if (jsonOpt.isDefined) {
-      val json = Json.parseFull(jsonOpt.get)
+  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
+    override def process(): Unit = {
+      val controlledShutdownResult = Try { doControlledShutdown(id) }
+      controlledShutdownCallback(controlledShutdownResult)
+    }
 
-      json match {
-        case Some(m) =>
-          val topicAndPartitions: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
-          val isrChanges = m.asInstanceOf[Map[String, Any]]
-          val topicAndPartitionList = isrChanges("partitions").asInstanceOf[List[Any]]
-          topicAndPartitionList.foreach {
-            case tp =>
-              val topicAndPartition = tp.asInstanceOf[Map[String, Any]]
-              val topic = topicAndPartition("topic").asInstanceOf[String]
-              val partition = topicAndPartition("partition").asInstanceOf[Int]
-              topicAndPartitions += TopicAndPartition(topic, partition)
+    private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
+      if (!isActive) {
+        throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+      }
+
+      info("Shutting down broker " + id)
+
+      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+
+      controllerContext.shuttingDownBrokerIds.add(id)
+      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
+      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+
+      val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
+          controllerContext.partitionsOnBroker(id)
+            .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
+
+      allPartitionsAndReplicationFactorOnBroker.foreach {
+        case(topicAndPartition, replicationFactor) =>
+          controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
+            if (replicationFactor > 1) {
+              if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
+                // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
+                // notifies all affected brokers
+                partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
+                  controlledShutdownPartitionLeaderSelector)
+              } else {
+                // Stop the replica first. The state change below initiates ZK changes which should take some time
+                // before which the stop replica request should be completed (in most cases)
+                try {
+                  brokerRequestBatch.newBatch()
+                  brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+                    topicAndPartition.partition, deletePartition = false)
+                  brokerRequestBatch.sendRequestsToBrokers(epoch)
+                } catch {
+                  case e : IllegalStateException => {
+                    // Resign if the controller is in an illegal state
+                    error("Forcing the controller to resign")
+                    brokerRequestBatch.clear()
+                    triggerControllerMove()
+
+                    throw e
+                  }
+                }
+                // If the broker is a follower, updates the isr in ZK and notifies the current leader
+                replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
+                  topicAndPartition.partition, id)), OfflineReplica)
+              }
+            }
           }
-          topicAndPartitions
-        case None =>
-          error("Invalid topic and partition JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
-          Set.empty
       }
-    } else {
-      Set.empty
+      def replicatedPartitionsBrokerLeads() = {
+        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+        controllerContext.partitionLeadershipInfo.filter {
+          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+        }.keys
+      }
+      replicatedPartitionsBrokerLeads().toSet
+    }
+  }
+
+  case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
+    override def process(): Unit = {
+      import JavaConverters._
+      if (!isActive) return
+      val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
+      debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+      val responseMap = stopReplicaResponse.responses.asScala
+      val partitionsInError =
+        if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
+        else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
+      val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+      // move all the failed replicas to ReplicaDeletionIneligible
+      topicDeletionManager.failReplicaDeletion(replicasInError)
+      if (replicasInError.size != responseMap.size) {
+        // some replicas could have been successfully deleted
+        val deletedReplicas = responseMap.keySet -- partitionsInError
+        topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
+      }
+    }
+  }
+
+  case object Startup extends ControllerEvent {
+    override def process(): Unit = {
+      registerSessionExpirationListener()
+      registerControllerChangeListener()
+      elect()
+    }
+  }
+
+  case class ControllerChange(newControllerId: Int) extends ControllerEvent {
+    override def process(): Unit = {
+      val wasActiveBeforeChange = isActive
+      activeControllerId.set(newControllerId)
+      if (wasActiveBeforeChange && !isActive) {
+        onControllerResignation()
+      }
+    }
+  }
+
+  case object Reelect extends ControllerEvent {
+    override def process(): Unit = {
+      val wasActiveBeforeChange = isActive
+      activeControllerId.set(getControllerID())
+      if (wasActiveBeforeChange && !isActive) {
+        onControllerResignation()
+      }
+      elect()
     }
   }
+
+  private def updateMetrics(): Unit = {
+    val opc = if (!isActive)
+      0
+    else
+      controllerContext.partitionLeadershipInfo.count(p =>
+        !controllerContext.liveOrShuttingDownBrokerIds.contains(p._2.leaderAndIsr.leader) &&
+          !topicDeletionManager.isTopicQueuedUpForDeletion(p._1.topic)
+      )
+    offlinePartitionCount.set(opc)
+
+    val pric = if (!isActive)
+      0
+    else
+      controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+        controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
+        controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != replicas.head &&
+          !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
+      }
+    preferredReplicaImbalanceCount.set(pric)
+  }
+
+  private def triggerControllerMove(): Unit = {
+    activeControllerId.set(-1)
+    controllerContext.zkUtils.deletePath(ZkUtils.ControllerPath)
+  }
+
+  def elect(): Unit = {
+    val timestamp = time.milliseconds
+    val electString = ZkUtils.controllerZkData(config.brokerId, timestamp)
+
+    activeControllerId.set(getControllerID())
+    /*
+     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
+     * it's possible that the controller has already been elected when we get here. This check will prevent the following
+     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
+     */
+    if(activeControllerId.get() != -1) {
+      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId.get()))
+      return
+    }
+
+    try {
+      val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath,
+                                                      electString,
+                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
+                                                      controllerContext.zkUtils.isSecure)
+      zkCheckedEphemeral.create()
+      info(config.brokerId + " successfully elected as the controller")
+      activeControllerId.set(config.brokerId)
+      onControllerFailover()
+    } catch {
+      case _: ZkNodeExistsException =>
+        // If someone else has written the path, then
+        activeControllerId.set(getControllerID)
+
+        if (activeControllerId.get() != -1)
+          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId.get(), config.brokerId))
+        else
+          warn("A controller has been elected but just resigned, this will result in another round of election")
+
+      case e2: Throwable =>
+        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
+        triggerControllerMove()
+    }
+  }
+}
+
+/**
+  * This is the zookeeper listener that triggers all the state transitions for a replica
+  */
+class BrokerChangeListener(controller: KafkaController) extends IZkChildListener with Logging {
+  override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
+    import JavaConverters._
+    controller.addToControllerEventQueue(controller.BrokerChange(currentChilds.asScala))
+  }
+}
+
+class TopicChangeListener(controller: KafkaController) extends IZkChildListener with Logging {
+  override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
+    import JavaConverters._
+    controller.addToControllerEventQueue(controller.TopicChange(currentChilds.asScala.toSet))
+  }
+}
+
+class PartitionModificationsListener(controller: KafkaController, topic: String) extends IZkDataListener with Logging {
+  override def handleDataChange(dataPath: String, data: Any): Unit = {
+    controller.addToControllerEventQueue(controller.PartitionModifications(topic))
+  }
+
+  override def handleDataDeleted(dataPath: String): Unit = {}
+}
+
+/**
+  * Delete topics includes the following operations -
+  * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
+  * 2. If there are topics to be deleted, it signals the delete topic thread
+  */
+class TopicDeletionListener(controller: KafkaController) extends IZkChildListener with Logging {
+  override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
+    import JavaConverters._
+    controller.addToControllerEventQueue(controller.TopicDeletion(currentChilds.asScala.toSet))
+  }
+}
+
+/**
+ * Starts the partition reassignment process unless -
+ * 1. Partition previously existed
+ * 2. New replicas are the same as existing replicas
+ * 3. Any replica in the new set of replicas are dead
+ * If any of the above conditions are satisfied, it logs an error and removes the partition from list of reassigned
+ * partitions.
+ */
+class PartitionReassignmentListener(controller: KafkaController) extends IZkDataListener with Logging {
+  override def handleDataChange(dataPath: String, data: Any): Unit = {
+    val partitionReassignment = ZkUtils.parsePartitionReassignmentData(data.toString)
+    controller.addToControllerEventQueue(controller.PartitionReassignment(partitionReassignment))
+  }
+
+  override def handleDataDeleted(dataPath: String): Unit = {}
+}
+
+class PartitionReassignmentIsrChangeListener(controller: KafkaController, topic: String, partition: Int, reassignedReplicas: Set[Int]) extends IZkDataListener with Logging {
+  override def handleDataChange(dataPath: String, data: Any): Unit = {
+    controller.addToControllerEventQueue(controller.PartitionReassignmentIsrChange(TopicAndPartition(topic, partition), reassignedReplicas))
+  }
+
+  override def handleDataDeleted(dataPath: String): Unit = {}
+}
+
+/**
+ * Called when replica leader initiates isr change
+ */
+class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
+  override def handleChildChange(parentPath: String, currentChilds: java.util.List[String]): Unit = {
+    import JavaConverters._
+    controller.addToControllerEventQueue(controller.IsrChangeNotification(currentChilds.asScala))
+  }
 }
 
 object IsrChangeNotificationListener {
@@ -1400,40 +1669,48 @@ object IsrChangeNotificationListener {
  * Starts the preferred replica leader election for the list of partitions specified under
  * /admin/preferred_replica_election -
  */
-class PreferredReplicaElectionListener(protected val controller: KafkaController) extends ControllerZkDataListener {
-  private val controllerContext = controller.controllerContext
+class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
+  override def handleDataChange(dataPath: String, data: Any): Unit = {
+    val partitions = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
+    controller.addToControllerEventQueue(controller.PreferredReplicaLeaderElection(partitions))
+  }
 
-  protected def logName = "PreferredReplicaElectionListener"
+  override def handleDataDeleted(dataPath: String): Unit = {}
+}
+
+class ControllerChangeListener(controller: KafkaController) extends IZkDataListener {
+  override def handleDataChange(dataPath: String, data: Any): Unit = {
+    controller.addToControllerEventQueue(controller.ControllerChange(KafkaController.parseControllerId(data.toString)))
+  }
+
+  override def handleDataDeleted(dataPath: String): Unit = {
+    controller.addToControllerEventQueue(controller.Reelect)
+  }
+}
+
+class SessionExpirationListener(controller: KafkaController) extends IZkStateListener with Logging {
+  override def handleStateChanged(state: KeeperState) {
+    // do nothing, since zkclient will do reconnect for us.
+  }
 
   /**
-   * Invoked when some partitions are reassigned by the admin command
-   *
-   * @throws Exception On any error.
-   */
+    * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+    * any ephemeral nodes here.
+    *
+    * @throws Exception On any error.
+    */
   @throws[Exception]
-  def doHandleDataChange(dataPath: String, data: AnyRef) {
-    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
-            .format(dataPath, data.toString))
-    inLock(controllerContext.controllerLock) {
-      val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
-      if (controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
-        info("These partitions are already undergoing preferred replica election: %s"
-          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
-      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
-      val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-      if (partitionsForTopicsToBeDeleted.nonEmpty) {
-        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
-          .format(partitionsForTopicsToBeDeleted))
-      }
-      controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
-    }
+  override def handleNewSession(): Unit = {
+    controller.addToControllerEventQueue(controller.Reelect)
   }
 
-  def doHandleDataDeleted(dataPath: String) {}
+  override def handleSessionEstablishmentError(error: Throwable): Unit = {
+    //no-op handleSessionEstablishmentError in KafkaHealthCheck should handle this error in its handleSessionEstablishmentError
+  }
 }
 
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
-                                       var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
+                                       var isrChangeListener: PartitionReassignmentIsrChangeListener = null)
 
 case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
   override def toString: String = {
@@ -1463,3 +1740,7 @@ object ControllerStats extends KafkaMetricsGroup {
 
   def leaderElectionTimer: KafkaTimer = _leaderElectionTimer
 }
+
+sealed trait ControllerEvent {
+  def process(): Unit
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb663d04/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 8d99fe2..d1799a6 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -18,10 +18,10 @@ package kafka.controller
 
 import kafka.admin.AdminUtils
 import kafka.api.LeaderAndIsr
+import kafka.common.{LeaderElectionNotNeededException, NoReplicaOnlineException, StateChangeFailedException, TopicAndPartition}
 import kafka.log.LogConfig
-import kafka.utils.Logging
-import kafka.common.{LeaderElectionNotNeededException, TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.server.{ConfigType, KafkaConfig}
+import kafka.utils.Logging
 
 trait PartitionLeaderSelector {