You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2017/10/18 16:15:06 UTC

[4/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 811ff67..d4a012e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,31 +16,28 @@
 package kafka.controller
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 import com.yammer.metrics.core.Gauge
-import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
+import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
-import kafka.log.LogConfig
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
-import kafka.utils.ZkUtils._
 import kafka.utils._
-import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse, LeaderAndIsrResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse}
 import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 import scala.collection._
 import scala.util.Try
-class ControllerContext(val zkUtils: ZkUtils) {
+class ControllerContext {
   val stats = new ControllerStats
   var controllerChannelManager: ControllerChannelManager = null
@@ -133,33 +130,13 @@ object KafkaController extends Logging {
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
-  def parseControllerId(controllerInfoString: String): Int = {
-    try {
-      Json.parseFull(controllerInfoString) match {
-        case Some(js) => js.asJsonObject("brokerid").to[Int]
-        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
-      }
-    } catch {
-      case _: Throwable =>
-        // It may be due to an incompatible controller register version
-        warn("Failed to parse the controller info as json. "
-          + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
-        try controllerInfoString.toInt
-        catch {
-          case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
-        }
-    }
-  }
-class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
+class KafkaController(val config: KafkaConfig, kafkaControllerZkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = s"[Controller id=${config.brokerId}] "
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
-  val controllerContext = new ControllerContext(zkUtils)
-  val partitionStateMachine = new PartitionStateMachine(this, stateChangeLogger)
-  val replicaStateMachine = new ReplicaStateMachine(this, stateChangeLogger)
+  val controllerContext = new ControllerContext
   // have a separate scheduler for the controller to be able to start and stop independently of the kafka server
   // visible for testing
@@ -169,21 +146,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
     _ => updateMetrics())
-  val topicDeletionManager = new TopicDeletionManager(this, eventManager)
-  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
-  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
-  private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
-  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+  val topicDeletionManager = new TopicDeletionManager(this, eventManager, kafkaControllerZkUtils)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
-  private val brokerChangeListener = new BrokerChangeListener(this, eventManager)
-  private val topicChangeListener = new TopicChangeListener(this, eventManager)
-  private val topicDeletionListener = new TopicDeletionListener(this, eventManager)
-  private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
-  private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager)
-  private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager)
-  private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager)
-  private val logDirEventNotificationListener = new LogDirEventNotificationListener(this, eventManager)
+  val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, kafkaControllerZkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, kafkaControllerZkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
+  private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
+  private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
+  private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
+  private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
+  private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
+  private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
+  private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
+  private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
   @volatile private var activeControllerId = -1
   @volatile private var offlinePartitionCount = 0
@@ -270,35 +246,40 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    * This ensures another controller election will be triggered and there will always be an actively serving controller
   def onControllerFailover() {
-    info("Starting become controller state transition")
+    info("Reading controller epoch from zookeeper")
+    info("Incrementing controller epoch in zookeeper")
-    LogDirUtils.deleteLogDirEvents(zkUtils)
+    info("Registering handlers")
     // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
-    registerPartitionReassignmentListener()
-    registerIsrChangeNotificationListener()
-    registerPreferredReplicaElectionListener()
-    registerTopicChangeListener()
-    registerTopicDeletionListener()
-    registerBrokerChangeListener()
-    registerLogDirEventNotificationListener()
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(brokerChangeHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(topicChangeHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(topicDeletionHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(logDirEventNotificationHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(isrChangeNotificationHandler)
+    kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
+    kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+    info("Deleting log dir event notifications")
+    kafkaControllerZkUtils.deleteLogDirEventNotifications()
+    info("Deleting isr change notifications")
+    kafkaControllerZkUtils.deleteIsrChangeNotifications()
+    info("Initializing controller context")
+    info("Fetching topic deletions in progress")
     val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
+    info("Initializing topic deletion manager")
     topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
     // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
     // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
     // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
     // partitionStateMachine.startup().
+    info("Sending update metadata request")
-    // register the partition change listeners for all existing topics on failover
-    controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
     info(s"Ready to serve as the new controller with epoch $epoch")
@@ -323,10 +304,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onControllerResignation() {
     // de-register listeners
-    deregisterIsrChangeNotificationListener()
-    deregisterPartitionReassignmentListener()
-    deregisterPreferredReplicaElectionListener()
-    deregisterLogDirEventNotificationListener()
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
+    kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
+    kafkaControllerZkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
     // reset topic deletion manager
@@ -339,15 +320,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     globalPartitionCount = 0
     // de-register partition ISR listener for on-going partition reassignment task
-    deregisterPartitionReassignmentIsrChangeListeners()
+    unregisterPartitionReassignmentIsrChangeHandlers()
     // shutdown partition state machine
-    deregisterTopicChangeListener()
-    partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener)
-    deregisterTopicDeletionListener()
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
+    unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
     // shutdown replica state machine
-    deregisterBrokerChangeListener()
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
@@ -367,7 +348,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
     // send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online.
     val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet)
-    replicaStateMachine.handleStateChanges(replicasOnBrokers, OnlineReplica)
+    replicaStateMachine.handleStateChanges(replicasOnBrokers.toSeq, OnlineReplica)
@@ -396,7 +377,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
-    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
+    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
@@ -450,11 +431,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
-    partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     // trigger OfflineReplica state change for those newly offline replicas
-    replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion, OfflineReplica)
+    replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
     // fail deletion of topics that affected by the offline replicas
     if (newOfflineReplicasForDeletion.nonEmpty) {
@@ -472,20 +453,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
-   * This callback is invoked by the partition state machine's topic change listener with the list of new topics
-   * and partitions as input. It does the following -
-   * 1. Registers partition change listener. This is not required until KAFKA-347
-   * 2. Invokes the new partition callback
-   * 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served
-   */
-  def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
-    info("New topic creation callback for %s".format(newPartitions.mkString(",")))
-    // subscribe to partition changes
-    topics.foreach(topic => registerPartitionModificationsListener(topic))
-    onNewPartitionCreation(newPartitions)
-  }
-  /**
    * This callback is invoked by the topic change callback with the list of failed brokers as input.
    * It does the following -
    * 1. Move the newly created partitions to the NewPartition state
@@ -493,10 +460,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
-    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
-    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
-    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
-    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
+    partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
+    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
+    partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
@@ -542,7 +509,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
+    if (!areReplicasInIsr(topicAndPartition, reassignedReplicas)) {
       info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
         "reassigned not yet caught up with the leader")
       val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
@@ -561,7 +528,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
       //5. replicas in RAR -> OnlineReplica
       reassignedReplicas.foreach { replica =>
-        replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
+        replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
           replica)), OnlineReplica)
       //6. Set AR to RAR in memory.
@@ -584,15 +551,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
-  private def watchIsrChangesForReassignedPartition(topic: String,
-                                                    partition: Int,
+  private def watchIsrChangesForReassignedPartition(partition: TopicAndPartition,
                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, eventManager, topic, partition,
-      reassignedReplicas.toSet)
-    reassignedPartitionContext.isrChangeListener = isrChangeListener
+    val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
+    reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
     // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkUtils.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+    kafkaControllerZkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
   def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -610,7 +574,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
           } else {
             info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
             // first register ISR change listener
-            watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+            watchIsrChangesForReassignedPartition(topicAndPartition, reassignedPartitionContext)
             controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
             // mark topic ineligible for deletion for the partitions being reassigned
@@ -629,7 +593,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
-      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+      partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
@@ -663,49 +627,38 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def incrementControllerEpoch() = {
-    try {
-      val newControllerEpoch = controllerContext.epoch + 1
-      val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
-        ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
-      if(!updateSucceeded)
+    val newControllerEpoch = controllerContext.epoch + 1
+    val setDataResponse = kafkaControllerZkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
+    if (Code.get(setDataResponse.rc) == Code.OK) {
+      controllerContext.epochZkVersion = setDataResponse.stat.getVersion
+      controllerContext.epoch = newControllerEpoch
+    } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+      // if path doesn't exist, this is the first controller whose epoch should be 1
+      // the following call can still fail if another controller gets elected between checking if the path exists and
+      // trying to create the controller epoch path
+      val createResponse = kafkaControllerZkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
+      if (Code.get(createResponse.rc) == Code.OK) {
+        controllerContext.epoch = KafkaController.InitialControllerEpoch
+        controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
+      } else if (Code.get(createResponse.rc) == Code.NODEEXISTS) {
         throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
-      else {
-        controllerContext.epochZkVersion = newVersion
-        controllerContext.epoch = newControllerEpoch
+      } else {
+        val exception = KeeperException.create(Code.get(createResponse.rc))
+        error("Error while incrementing controller epoch", exception)
+        throw exception
-    } catch {
-      case _: ZkNoNodeException =>
-        // if path doesn't exist, this is the first controller whose epoch should be 1
-        // the following call can still fail if another controller gets elected between checking if the path exists and
-        // trying to create the controller epoch path
-        try {
-          zkUtils.createPersistentPath(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
-          controllerContext.epoch = KafkaController.InitialControllerEpoch
-          controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
-        } catch {
-          case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
-            "Aborting controller startup procedure")
-          case oe: Throwable => error("Error while incrementing controller epoch", oe)
-        }
-      case oe: Throwable => error("Error while incrementing controller epoch", oe)
+    } else {
+      throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
-    info(s"Incremented epoch to ${controllerContext.epoch}")
-  }
-  private def registerSessionExpirationListener() = {
-    zkUtils.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
-  }
-  private def registerControllerChangeListener() = {
-    zkUtils.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
+    info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
   private def initializeControllerContext() {
     // update controller cache with delete topic information
-    controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
-    controllerContext.allTopics = zkUtils.getAllTopics().toSet
-    controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
+    controllerContext.liveBrokers = kafkaControllerZkUtils.getAllBrokersInCluster.toSet
+    controllerContext.allTopics = kafkaControllerZkUtils.getAllTopicsInCluster.toSet
+    registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
+    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ kafkaControllerZkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
@@ -719,7 +672,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
-    val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
+    val partitionsUndergoingPreferredReplicaElection = kafkaControllerZkUtils.getPreferredReplicaElection
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -755,7 +708,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
-    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+    val partitionsBeingReassigned = kafkaControllerZkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
     // check if they are already completed or topic was deleted
     val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -774,7 +727,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
-    val topicsToBeDeleted = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
+    val topicsToBeDeleted = kafkaControllerZkUtils.getTopicDeletions.toSet
     val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
       replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
@@ -797,15 +750,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
-  def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
-    val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
-    for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
-      controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
+  def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+    val leaderIsrAndControllerEpochs = kafkaControllerZkUtils.getTopicPartitionStates(partitions)
+    leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+      controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    }
-  private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
-    zkUtils.getLeaderAndIsrForPartition(topic, partition).exists { leaderAndIsr =>
-      replicas.forall(leaderAndIsr.isr.contains)
+  private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
+    kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
+      replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
@@ -821,7 +775,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
-      partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
+      partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
     } else {
       // check if the leader is alive or not
       if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) {
@@ -832,7 +786,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       } else {
         info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
           "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
-        partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
+        partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
@@ -844,22 +798,28 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     val partition = topicAndPartition.partition
     // first move the replica to offline state (the controller removes it from the ISR)
     val replicasToBeDeleted = => PartitionAndReplica(topic, partition, r))
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, OfflineReplica)
     // send stop replica command to the old replicas
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionStarted)
     // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionSuccessful)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, NonExistentReplica)
-  private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
+  private def updateAssignedReplicasForPartition(partition: TopicAndPartition,
                                                  replicas: Seq[Int]) {
-    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
-    partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
-    updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
-    info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
-    // update the assigned replica list after a successful zookeeper write
-    controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
+    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(partition.topic))
+    partitionsAndReplicasForThisTopic.put(partition, replicas)
+    val setDataResponse = kafkaControllerZkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+    if (Code.get(setDataResponse.rc) == Code.OK) {
+      info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
+      // update the assigned replica list after a successful zookeeper write
+      controllerContext.partitionReplicaAssignment.put(partition, replicas)
+    } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+      throw new IllegalStateException("Topic %s doesn't exist".format(partition.topic))
+    } else {
+      throw new KafkaException(KeeperException.create(Code.get(setDataResponse.rc)))
+    }
   private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
@@ -868,18 +828,18 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
     // replicas list
     newReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
+      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
-  private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+  private def updateLeaderEpochAndSendRequest(partition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
-    updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
+    updateLeaderEpoch(partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
-            topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition.topic,
+            partition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
         } catch {
           case e: IllegalStateException =>
@@ -887,97 +847,43 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
         stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with new assigned replica " +
           s"list ${newAssignedReplicas.mkString(",")} to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
-          s"for partition being reassigned $topicAndPartition")
+          s"for partition being reassigned $partition")
       case None => // fail the reassignment
         stateChangeLog.error("Failed to send LeaderAndIsr request with new assigned replica list " +
-          s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $topicAndPartition")
+          s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $partition")
-  private def registerBrokerChangeListener() = {
-    zkUtils.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
-  }
-  private def deregisterBrokerChangeListener() = {
-    zkUtils.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
-  }
-  private def registerTopicChangeListener() = {
-    zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
-  }
-  private def deregisterTopicChangeListener() = {
-    zkUtils.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
-  }
-  def registerPartitionModificationsListener(topic: String) = {
-    partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager, topic))
-    zkUtils.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
-  }
-  def deregisterPartitionModificationsListener(topic: String) = {
-    zkUtils.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
-    partitionModificationsListeners.remove(topic)
-  }
-  private def registerTopicDeletionListener() = {
-    zkUtils.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
-  }
-  private def deregisterTopicDeletionListener() = {
-    zkUtils.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
-  }
-  private def registerPartitionReassignmentListener() = {
-    zkUtils.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
-  }
-  private def deregisterPartitionReassignmentListener() = {
-    zkUtils.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
-  }
-  private def registerIsrChangeNotificationListener() = {
-    debug("Registering IsrChangeNotificationListener")
-    zkUtils.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
-  }
-  private def deregisterIsrChangeNotificationListener() = {
-    debug("De-registering IsrChangeNotificationListener")
-    zkUtils.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
-  }
-  private def registerPreferredReplicaElectionListener() {
-    zkUtils.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+  private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
+    topics.foreach { topic =>
+      val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
+      partitionModificationsHandlers.put(topic, partitionModificationsHandler)
+    }
+    partitionModificationsHandlers.values.foreach(kafkaControllerZkUtils.registerZNodeChangeHandler)
-  private def deregisterPreferredReplicaElectionListener() {
-    zkUtils.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+  def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
+    topics.foreach { topic =>
+      partitionModificationsHandlers.remove(topic)
+        .foreach(handler => kafkaControllerZkUtils.unregisterZNodeChangeHandler(handler.path))
+    }
-  private def deregisterPartitionReassignmentIsrChangeListeners() {
+  private def unregisterPartitionReassignmentIsrChangeHandlers() {
     controllerContext.partitionsBeingReassigned.foreach {
       case (topicAndPartition, reassignedPartitionsContext) =>
-        val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
-        zkUtils.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+        val partitionReassignmentIsrChangeHandler =
+          reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
+        kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
-  private def registerLogDirEventNotificationListener() = {
-    debug("Registering logDirEventNotificationListener")
-    zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
-  }
-  private def deregisterLogDirEventNotificationListener() = {
-    debug("De-registering logDirEventNotificationListener")
-    zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
-  }
   private def readControllerEpochFromZookeeper() {
     // initialize the controller epoch and zk version by reading from zookeeper
-    if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) {
-      val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath)
-      controllerContext.epoch = epochData._1.toInt
-      controllerContext.epochZkVersion = epochData._2.getVersion
+    val epochAndStatOpt = kafkaControllerZkUtils.getControllerEpoch
+    epochAndStatOpt.foreach { case (epoch, stat) =>
+      controllerContext.epoch = epoch
+      controllerContext.epochZkVersion = stat.getVersion
       info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
@@ -985,33 +891,35 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
     if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
       // stop watching the ISR changes for this partition
-      zkUtils.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-        controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+      val partitionReassignmentIsrChangeHandler =
+        controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
+      kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
     // read the current list of reassigned partitions from zookeeper
-    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+    val partitionsBeingReassigned = kafkaControllerZkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
-    if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size)
-      zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
+    if (reassignment.isEmpty) {
+      info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
+      kafkaControllerZkUtils.deletePartitionReassignment()
+    } else {
+      val setDataResponse = kafkaControllerZkUtils.setPartitionReassignmentRaw(reassignment)
+      if (Code.get(setDataResponse.rc) == Code.OK) {
+      } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+        val createDataResponse = kafkaControllerZkUtils.createPartitionReassignment(reassignment)
+        if (Code.get(createDataResponse.rc) != Code.OK) {
+          throw new AdminOperationException(KeeperException.create(Code.get(createDataResponse.rc)))
+        }
+      } else {
+        throw new AdminOperationException(KeeperException.create(Code.get(setDataResponse.rc)))
+      }
+    }
     // update the cache. NO-OP if the partition's reassignment was never started
-  def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
-                                         newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
-    try {
-      val zkPath = getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = zkUtils.replicaAssignmentZkData( => e._1.partition.toString -> e._2))
-      zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
-      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
-    } catch {
-      case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
-      case e2: Throwable => throw new KafkaException(e2.toString)
-    }
-  }
   def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
                                                    isTriggeredByAutoRebalance : Boolean) {
     for(partition <- partitionsToBeRemoved) {
@@ -1025,7 +933,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     if (!isTriggeredByAutoRebalance)
-      zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath)
+      kafkaControllerZkUtils.deletePreferredReplicaElection()
@@ -1046,93 +954,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
-   * Removes a given partition replica from the ISR; if it is not the current
-   * leader and there are sufficient remaining replicas in ISR.
-   *
-   * @param topic topic
-   * @param partition partition
-   * @param replicaId replica Id
-   * @return the new leaderAndIsr (with the replica removed if it was present),
-   *         or None if leaderAndIsr is empty.
-   */
-  def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
-      controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","), topicAndPartition))
-    var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
-    var zkWriteCompleteOrUnnecessary = false
-    while (!zkWriteCompleteOrUnnecessary) {
-      // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
-        case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
-          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
-          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
-          if(controllerEpoch > epoch)
-            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
-              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
-              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
-          if (leaderAndIsr.isr.contains(replicaId)) {
-            // if the replica to be removed from the ISR is also the leader, set the new leader value to -1
-            val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
-            var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
-            // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
-            // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
-            // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkUtils,
-              ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
-              info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
-              newIsr = leaderAndIsr.isr
-            }
-            val newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
-            // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
-              newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-            val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
-            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
-            if (updateSucceeded) {
-              info(s"New leader and ISR for partition $topicAndPartition is $leaderWithNewVersion")
-            }
-            updateSucceeded
-          } else {
-            warn(s"Cannot remove replica $replicaId from ISR of partition $topicAndPartition since it is not in the ISR." +
-              s" Leader = ${leaderAndIsr.leader} ; ISR = ${leaderAndIsr.isr}")
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
-            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
-            true
-          }
-        case None =>
-          warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is empty.".format(replicaId, topicAndPartition))
-          true
-      }
-    }
-    finalLeaderIsrAndControllerEpoch
-  }
-  /**
    * Does not change leader or isr, but just increments the leader epoch
-   * @param topic topic
    * @param partition partition
    * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
-  private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Updating leader epoch for partition %s.".format(topicAndPartition))
+  private def updateLeaderEpoch(partition: TopicAndPartition): Option[LeaderIsrAndControllerEpoch] = {
+    debug("Updating leader epoch for partition %s.".format(partition))
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
-        case Some(leaderIsrAndEpoch) =>
-          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
-          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
-          if(controllerEpoch > epoch)
+      zkWriteCompleteOrUnnecessary = kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+          val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+          if (controllerEpoch > epoch)
             throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
               "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
               "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
@@ -1140,19 +977,19 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
           // assigned replica list
           val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
           // update the new leadership decision in zookeeper or retry
-          val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
-            partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-          val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
-          finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
-          if (updateSucceeded) {
-            info(s"Updated leader epoch for partition $topicAndPartition to ${leaderWithNewVersion.leaderEpoch}")
-          }
-          updateSucceeded
+          val (successfulUpdates, updatesToRetry, failedUpdates) =
+            kafkaControllerZkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
+          if (successfulUpdates.contains(partition)) {
+            val finalLeaderAndIsr = successfulUpdates(partition)
+            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
+            info(s"Updated leader epoch for partition $partition to ${finalLeaderAndIsr.leaderEpoch}")
+            true
+          } else if (failedUpdates.contains(partition)) {
+            throw failedUpdates(partition)
+          } else false
         case None =>
-          throw new IllegalStateException(s"Cannot update leader epoch for partition $topicAndPartition as " +
+          throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +
             "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")
-          true
@@ -1194,654 +1031,565 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
-  def getControllerID(): Int = {
-    controllerContext.zkUtils.readDataMaybeNull(ZkUtils.ControllerPath)._1 match {
-      case Some(controller) => KafkaController.parseControllerId(controller)
-      case None => -1
-    }
-  }
-  case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
+  case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
-    def state = ControllerState.BrokerChange
+    def state = ControllerState.AutoLeaderBalance
     override def process(): Unit = {
       if (!isActive) return
-      // Read the current broker list from ZK again instead of using currentBrokerList to increase
-      // the odds of processing recent broker changes in a single ControllerEvent (KAFKA-5502).
-      val curBrokers = zkUtils.getAllBrokersInCluster().toSet
-      val curBrokerIds =
-      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
-      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
-      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
-      val newBrokers = curBrokers.filter(broker => newBrokerIds(
-      controllerContext.liveBrokers = curBrokers
-      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
-      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
-      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
-      info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
-        .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
-      newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
-      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
-      if (newBrokerIds.nonEmpty)
-        onBrokerStartup(newBrokerIdsSorted)
-      if (deadBrokerIds.nonEmpty)
-        onBrokerFailure(deadBrokerIdsSorted)
+      try {
+        checkAndTriggerAutoLeaderRebalance()
+      } finally {
+        scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+      }
-  case class TopicChange(topics: Set[String]) extends ControllerEvent {
+  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
-    def state = ControllerState.TopicChange
+    def state = ControllerState.ControlledShutdown
     override def process(): Unit = {
-      if (!isActive) return
-      val newTopics = topics -- controllerContext.allTopics
-      val deletedTopics = controllerContext.allTopics -- topics
-      controllerContext.allTopics = topics
-      val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
-      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
-        !deletedTopics.contains(p._1.topic))
-      controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
-      info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
-        deletedTopics, addedPartitionReplicaAssignment))
-      if (newTopics.nonEmpty)
-        onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
+      val controlledShutdownResult = Try { doControlledShutdown(id) }
+      controlledShutdownCallback(controlledShutdownResult)
-  }
-  case class PartitionModifications(topic: String) extends ControllerEvent {
+    private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
+      if (!isActive) {
+        throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+      }
-    def state = ControllerState.TopicChange
+      info("Shutting down broker " + id)
-    override def process(): Unit = {
-      if (!isActive) return
-      val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
-      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
-        !controllerContext.partitionReplicaAssignment.contains(p._1))
-      if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(","), topic))
-      else {
-        if (partitionsToBeAdded.nonEmpty) {
-          info(s"New partitions to be added $partitionsToBeAdded")
-          controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
-          onNewPartitionCreation(partitionsToBeAdded.keySet)
+      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+      controllerContext.shuttingDownBrokerIds.add(id)
+      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
+      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+      val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
+        controllerContext.partitionReplicaAssignment(partition).size > 1 && controllerContext.partitionLeadershipInfo.contains(partition)
+      }
+      val (partitionsLeadByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
+        controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
+      }
+      partitionStateMachine.handleStateChanges(partitionsLeadByBroker.toSeq, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
+      try {
+        brokerRequestBatch.newBatch()
+        partitionsFollowedByBroker.foreach { partition =>
+          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition.topic,
+            partition.partition, deletePartition = false, null)
+        brokerRequestBatch.sendRequestsToBrokers(epoch)
+      } catch {
+        case e: IllegalStateException =>
+          handleIllegalState(e)
+      }
+      // If the broker is a follower, updates the isr in ZK and notifies the current leader
+      replicaStateMachine.handleStateChanges( => PartitionAndReplica(partition.topic, partition.partition, id)).toSeq, OfflineReplica)
+      def replicatedPartitionsBrokerLeads() = {
+        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+        controllerContext.partitionLeadershipInfo.filter {
+          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+        }.keys
+      replicatedPartitionsBrokerLeads().toSet
-  case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
+  case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
-    def state = ControllerState.TopicDeletion
+    def state = ControllerState.LeaderAndIsrResponseReceived
     override def process(): Unit = {
+      import JavaConverters._
       if (!isActive) return
-      debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
-      val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
-      if (nonExistentTopics.nonEmpty) {
-        warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
-        nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
+      val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
+      if (leaderAndIsrResponse.error != Errors.NONE) {
+        stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
+        return
-      topicsToBeDeleted --= nonExistentTopics
-      if (config.deleteTopicEnable) {
-        if (topicsToBeDeleted.nonEmpty) {
-          info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
-          // mark topic ineligible for deletion if other state changes are in progress
-          topicsToBeDeleted.foreach { topic =>
-            val partitionReassignmentInProgress =
-            if (partitionReassignmentInProgress)
-              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
-          }
-          // add topic to deletion list
-          topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
-        }
-      } else {
-        // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
-        for (topic <- topicsToBeDeleted) {
-          info(s"Removing ${getDeleteTopicPath(topic)} since delete topic is disabled")
-          zkUtils.deletePath(getDeleteTopicPath(topic))
-        }
+      val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR)
+        new TopicAndPartition(_)).toSet
+      val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE)
+        new TopicAndPartition(_)).toSet
+      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
+      val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
+      controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
+      val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
+      if (newOfflineReplicas.nonEmpty) {
+"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
+        onReplicasBecomeOffline( => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
-  case class PartitionReassignment(partitionReassignment: Map[TopicAndPartition, Seq[Int]]) extends ControllerEvent {
+  case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
-    def state = ControllerState.PartitionReassignment
+    def state = ControllerState.TopicDeletion
     override def process(): Unit = {
+      import JavaConverters._
       if (!isActive) return
-      val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
-        if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
-          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
-            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
-          removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
-        } else {
-          val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
-          initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
-        }
+      val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
+      debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+      val responseMap = stopReplicaResponse.responses.asScala
+      val partitionsInError =
+        if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
+        else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
+      val replicasInError = => PartitionAndReplica(p.topic, p.partition, replicaId))
+      // move all the failed replicas to ReplicaDeletionIneligible
+      topicDeletionManager.failReplicaDeletion(replicasInError)
+      if (replicasInError.size != responseMap.size) {
+        // some replicas could have been successfully deleted
+        val deletedReplicas = responseMap.keySet -- partitionsInError
+        topicDeletionManager.completeReplicaDeletion( => PartitionAndReplica(p.topic, p.partition, replicaId)))
-  case class PartitionReassignmentIsrChange(topicAndPartition: TopicAndPartition, reassignedReplicas: Set[Int]) extends ControllerEvent {
+  case object Startup extends ControllerEvent {
-    def state = ControllerState.PartitionReassignment
+    def state = ControllerState.ControllerChange
     override def process(): Unit = {
-      if (!isActive) return
-        // check if this partition is still being reassigned or not
-      controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignedPartitionContext =>
-        // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-        val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
-        newLeaderAndIsrOpt match {
-          case Some(leaderAndIsr) => // check if new replicas have joined ISR
-            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-            if(caughtUpReplicas == reassignedReplicas) {
-              // resume the partition reassignment process
-              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                "Resuming partition reassignment")
-              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-            }
-            else {
-              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
-            }
-          case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
-            .format(topicAndPartition, reassignedReplicas.mkString(",")))
-        }
-      }
+      kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+      elect()
-  }
-  case class IsrChangeNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+  }
-    def state = ControllerState.IsrChange
+  private def updateMetrics(): Unit = {
+    offlinePartitionCount =
+      if (!isActive) {
+        0
+      } else {
+        controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) =>
+          !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+        }
+      }
-    override def process(): Unit = {
-      // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers
-      // to increase the odds of processing recent isr changes in a single ControllerEvent
-      // and to reduce the odds of trying to access znodes that have already been deleted (KAFKA-5879).
-      val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath)
-      if (!isActive) return
-      try {
-        val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet
-        if (topicAndPartitions.nonEmpty) {
-          updateLeaderAndIsrCache(topicAndPartitions)
-          processUpdateNotifications(topicAndPartitions)
+    preferredReplicaImbalanceCount =
+      if (!isActive) {
+        0
+      } else {
+        controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+          val preferredReplica = replicas.head
+          val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
+ != preferredReplica).getOrElse(false) &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
-      } finally {
-        // delete the notifications
- => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x))
-    }
-    private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
-      val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
-      debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
-      sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
-    }
+    globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
-    private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
-      val changeZnode = ZkUtils.IsrChangeNotificationPath + "/" + child
-      val (jsonOpt, _) = controllerContext.zkUtils.readDataMaybeNull(changeZnode)
- { json =>
-        Json.parseFull(json) match {
-          case Some(js) =>
-            val isrChanges = js.asJsonObject
-            isrChanges("partitions") { tpJs =>
-              val topic = tpJs("topic").to[String]
-              val partition = tpJs("partition").to[Int]
-              TopicAndPartition(topic, partition)
-            }.toSet
-          case None =>
-            error(s"Invalid topic and partition JSON in ZK. ZK notification node: $changeZnode, JSON: $json")
-            Set.empty[TopicAndPartition]
-        }
-      }.getOrElse(Set.empty[TopicAndPartition])
-    }
+    globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
+  }
+  // visible for testing
+  private[controller] def handleIllegalState(e: IllegalStateException): Nothing = {
+    // Resign if the controller is in an illegal state
+    error("Forcing the controller to resign")
+    brokerRequestBatch.clear()
+    triggerControllerMove()
+    throw e
+  }
+  private def triggerControllerMove(): Unit = {
+    onControllerResignation()
+    activeControllerId = -1
+    kafkaControllerZkUtils.deleteController()
+  }
+  def expire(): Unit = {
+    val expireEvent = Expire()
+    eventManager.clearAndPut(expireEvent)
+    expireEvent.waitUntilProcessed()
-  case class LogDirEventNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+  def newSession(): Unit = {
+    eventManager.put(Reelect)
+  }
-    def state = ControllerState.LogDirChange
+  def elect(): Unit = {
+    val timestamp = time.milliseconds
+    activeControllerId = kafkaControllerZkUtils.getControllerId.getOrElse(-1)
+    /*
+     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
+     * it's possible that the controller has already been elected when we get here. This check will prevent the following
+     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
+     */
+    if (activeControllerId != -1) {
+      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
+      return
+    }
-    override def process(): Unit = {
-      val zkUtils = controllerContext.zkUtils
-      try {
-        val brokerIds = sequenceNumbers.flatMap(LogDirUtils.getBrokerIdFromLogDirEvent(zkUtils, _))
-        onBrokerLogDirFailure(brokerIds)
-      } finally {
-        // delete processed children
- => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
-      }
+    try {
+      kafkaControllerZkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
+      info(config.brokerId + " successfully elected as the controller")
+      activeControllerId = config.brokerId
+      onControllerFailover()
+    } catch {
+      case _: NodeExistsException =>
+        // If someone else has written the path, then
+        activeControllerId = kafkaControllerZkUtils.getControllerId.getOrElse(-1)
+        if (activeControllerId != -1)
+          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
+        else
+          warn("A controller has been elected but just resigned, this will result in another round of election")
+      case e2: Throwable =>
+        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
+        triggerControllerMove()
-  case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent {
-    def state = ControllerState.ManualLeaderBalance
+  case object BrokerChange extends ControllerEvent {
+    override def state: ControllerState = ControllerState.BrokerChange
     override def process(): Unit = {
       if (!isActive) return
-      val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-      if (partitionsForTopicsToBeDeleted.nonEmpty) {
-        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
-          .format(partitionsForTopicsToBeDeleted))
-      }
-      onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
+      val curBrokers = kafkaControllerZkUtils.getAllBrokersInCluster.toSet
+      val curBrokerIds =
+      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+      val newBrokers = curBrokers.filter(broker => newBrokerIds(
+      controllerContext.liveBrokers = curBrokers
+      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
+      info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
+        .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+      newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
+      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
+      if (newBrokerIds.nonEmpty)
+        onBrokerStartup(newBrokerIdsSorted)
+      if (deadBrokerIds.nonEmpty)
+        onBrokerFailure(deadBrokerIdsSorted)
-  case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
+  case object TopicChange extends ControllerEvent {
+    override def state: ControllerState = ControllerState.TopicChange
-    def state = ControllerState.AutoLeaderBalance
+    override def process(): Unit = {
+      if (!isActive) return
+      val topics = kafkaControllerZkUtils.getAllTopicsInCluster.toSet
+      val newTopics = topics -- controllerContext.allTopics
+      val deletedTopics = controllerContext.allTopics -- topics
+      controllerContext.allTopics = topics
+      registerPartitionModificationsHandlers(newTopics.toSeq)
+      val addedPartitionReplicaAssignment = kafkaControllerZkUtils.getReplicaAssignmentForTopics(newTopics)
+      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
+        !deletedTopics.contains(p._1.topic))
+      controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
+      info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+        deletedTopics, addedPartitionReplicaAssignment))
+      if (addedPartitionReplicaAssignment.nonEmpty)
+        onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
+    }
+  }
+  case object LogDirEventNotification extends ControllerEvent {
+    override def state: ControllerState = ControllerState.LogDirChange
     override def process(): Unit = {
       if (!isActive) return
+      val sequenceNumbers = kafkaControllerZkUtils.getAllLogDirEventNotifications
       try {
-        checkAndTriggerAutoLeaderRebalance()
+        val brokerIds = kafkaControllerZkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers)
+        onBrokerLogDirFailure(brokerIds)
       } finally {
-        scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+        // delete processed children
+        kafkaControllerZkUtils.deleteLogDirEventNotifications(sequenceNumbers)
-  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
-    def state = ControllerState.ControlledShutdown
+  case class PartitionModifications(topic: String) extends ControllerEvent {
+    override def state: ControllerState = ControllerState.TopicChange
     override def process(): Unit = {
-      val controlledShutdownResult = Try { doControlledShutdown(id) }
-      controlledShutdownCallback(controlledShutdownResult)
-    }
-    private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
-      if (!isActive) {
-        throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+      if (!isActive) return
+      val partitionReplicaAssignment = kafkaControllerZkUtils.getReplicaAssignmentForTopics(immutable.Set(topic))
+      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
+        !controllerContext.partitionReplicaAssignment.contains(p._1))
+      if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+          .format(","), topic))
+      else {
+        if (partitionsToBeAdded.nonEmpty) {
+          info(s"New partitions to be added $partitionsToBeAdded")
+          controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
+          onNewPartitionCreation(partitionsToBeAdded.keySet)
+        }
+    }
+  }
-      info("Shutting down broker " + id)
-      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
-        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+  case object TopicDeletion extends ControllerEvent {
+    override def state: ControllerState = ControllerState.TopicDeletion
-      controllerContext.shuttingDownBrokerIds.add(id)
-      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
-      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
-      val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
-          controllerContext.partitionsOnBroker(id)
-            .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
-      allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor) =>
-        controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
-          if (replicationFactor > 1) {
-            if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
-              // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
-              // notifies all affected brokers
-              partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                controlledShutdownPartitionLeaderSelector)
-            } else {
-              // Stop the replica first. The state change below initiates ZK changes which should take some time
-              // before which the stop replica request should be completed (in most cases)
-              try {
-                brokerRequestBatch.newBatch()
-                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                  topicAndPartition.partition, deletePartition = false)
-                brokerRequestBatch.sendRequestsToBrokers(epoch)
-              } catch {
-                case e: IllegalStateException =>
-                  handleIllegalState(e)
-              }
-              // If the broker is a follower, updates the isr in ZK and notifies the current leader
-              replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                topicAndPartition.partition, id)), OfflineReplica)
-            }
+    override def process(): Unit = {
+      if (!isActive) return
+      var topicsToBeDeleted = kafkaControllerZkUtils.getTopicDeletions.toSet
+      debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
+      val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
+      if (nonExistentTopics.nonEmpty) {
+        warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
+        kafkaControllerZkUtils.deleteTopicDeletions(nonExistentTopics.toSeq)
+      }
+      topicsToBeDeleted --= nonExistentTopics
+      if (config.deleteTopicEnable) {
+        if (topicsToBeDeleted.nonEmpty) {
+          info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
+          // mark topic ineligible for deletion if other state changes are in progress
+          topicsToBeDeleted.foreach { topic =>
+            val partitionReassignmentInProgress =
+            if (partitionReassignmentInProgress)
+              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
+          // add topic to deletion list
+          topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
+      } else {
+        // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
+        info(s"Removing $topicsToBeDeleted since delete topic is disabled")
+        kafkaControllerZkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq)
-      def replicatedPartitionsBrokerLeads() = {
-        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
-        controllerContext.partitionLeadershipInfo.filter {
-          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
-        }.keys
-      }
-      replicatedPartitionsBrokerLeads().toSet
-  case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
-    def state = ControllerState.LeaderAndIsrResponseReceived
+  case object PartitionReassignment extends ControllerEvent {
+    override def state: ControllerState = ControllerState.PartitionReassignment
     override def process(): Unit = {
-      import JavaConverters._
-      val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
-      if (leaderAndIsrResponse.error != Errors.NONE) {
-        stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
-        return
+      if (!isActive) return
+      kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+      val partitionReassignment = kafkaControllerZkUtils.getPartitionReassignment
+      val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+        if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
+          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
+            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
+          removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+        } else {
+          val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
+          initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+        }
+    }
+  }
-      val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR)
-        new TopicAndPartition(_)).toSet
-      val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE)
-        new TopicAndPartition(_)).toSet
-      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
-      val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
-      controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
-      val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
+  case class PartitionReassignmentIsrChange(partition: TopicAndPartition) extends ControllerEvent {
+    override def state: ControllerState = ControllerState.PartitionReassignment
-      if (newOfflineReplicas.nonEmpty) {
-"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
-        onReplicasBecomeOffline( => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
+    override def process(): Unit = {
+      if (!isActive) return
+      // check if this partition is still being reassigned or not
+      controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
+        val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
+        kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+          case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
+            val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+            if(caughtUpReplicas == reassignedReplicas) {
+              // resume the partition reassignment process
+              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
+                "Resuming partition reassignment")
+              onPartitionReassignment(partition, reassignedPartitionContext)
+            }
+            else {
+              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
+                "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+            }
+          case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+            .format(partit