You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/18 16:15:03 UTC

[1/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller

Repository: kafka
Updated Branches:
  refs/heads/trunk 68f324f4b -> b71ee043f


http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
new file mode 100644
index 0000000..1214344
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.log.LogConfig
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection.mutable
+
+class PartitionStateMachineTest extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
+  private var mockTopicDeletionManager: TopicDeletionManager = null
+  private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null
+  private var partitionStateMachine: PartitionStateMachine = null
+
+  private val brokerId = 5
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
+  private val controllerEpoch = 50
+  private val partition = TopicAndPartition("t", 0)
+  private val partitions = Seq(partition)
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.epoch = controllerEpoch
+    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
+    mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
+    partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
+    partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager,
+      mockZkUtils, partitionState, mockControllerBrokerRequestBatch)
+  }
+
+  @Test
+  def testNonexistentPartitionToNewPartitionTransition(): Unit = {
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidNonexistentPartitionToOnlinePartitionTransition(): Unit = {
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    assertEquals(NonExistentPartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidNonexistentPartitionToOfflinePartitionTransition(): Unit = {
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(NonExistentPartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOnlinePartitionTransition(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, NewPartition)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andReturn(Seq(CreateResponse(Code.OK.intValue(), null, partition, null)))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOnlinePartitionTransitionZkUtilsExceptionFromCreateStates(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, NewPartition)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andThrow(new ZookeeperClientException("test"))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOnlinePartitionTransitionErrorCodeFromCreateStates(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, NewPartition)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
+      .andReturn(Seq(CreateResponse(Code.NODEEXISTS.intValue(), null, partition, null)))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testNewPartitionToOfflinePartitionTransition(): Unit = {
+    partitionState.put(partition, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidNewPartitionToNonexistentPartitionTransition(): Unit = {
+    partitionState.put(partition, NewPartition)
+    partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+    assertEquals(NewPartition, partitionState(partition))
+  }
+
+  @Test
+  def testOnlinePartitionToOnlineTransition(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OnlinePartition)
+    val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
+      Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOnlinePartitionToOnlineTransitionForControlledShutdown(): Unit = {
+    val otherBrokerId = brokerId + 1
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0), TestUtils.createBroker(otherBrokerId, "host", 0))
+    controllerContext.shuttingDownBrokerIds.add(brokerId)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId, otherBrokerId))
+    partitionState.put(partition, OnlinePartition)
+    val leaderAndIsr = LeaderAndIsr(brokerId, List(brokerId, otherBrokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId))
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
+      Seq(brokerId, otherBrokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOnlinePartitionToOfflineTransition(): Unit = {
+    partitionState.put(partition, OnlinePartition)
+    partitionStateMachine.handleStateChanges(partitions, OfflinePartition)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidOnlinePartitionToNonexistentPartitionTransition(): Unit = {
+    partitionState.put(partition, OnlinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidOnlinePartitionToNewPartitionTransition(): Unit = {
+    partitionState.put(partition, OnlinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToOnlinePartitionTransition(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OfflinePartition)
+    val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition, TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+
+    EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals()))
+      .andReturn((Map(partition.topic -> LogConfig()), Map.empty))
+    val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId)
+    val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2)
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
+      .andReturn((Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToOnlinePartitionTransitionZkUtilsExceptionFromStateLookup(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OfflinePartition)
+    val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andThrow(new ZookeeperClientException(""))
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToOnlinePartitionTransitionErrorCodeFromStateLookup(): Unit = {
+    controllerContext.liveBrokers = Set(TestUtils.createBroker(brokerId, "host", 0))
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    partitionState.put(partition, OfflinePartition)
+    val leaderAndIsr = LeaderAndIsr(LeaderAndIsr.NoLeader, List(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.NONODE.intValue(), null, partition, TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+  @Test
+  def testOfflinePartitionToNonexistentPartitionTransition(): Unit = {
+    partitionState.put(partition, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NonExistentPartition)
+    assertEquals(NonExistentPartition, partitionState(partition))
+  }
+
+  @Test
+  def testInvalidOfflinePartitionToNewPartitionTransition(): Unit = {
+    partitionState.put(partition, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitions, NewPartition)
+    assertEquals(OfflinePartition, partitionState(partition))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
new file mode 100644
index 0000000..62c28a0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.easymock.EasyMock
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection.mutable
+
+class ReplicaStateMachineTest extends JUnitSuite {
+  private var controllerContext: ControllerContext = null
+  private var mockZkUtils: KafkaControllerZkUtils = null
+  private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
+  private var mockTopicDeletionManager: TopicDeletionManager = null
+  private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = null
+  private var replicaStateMachine: ReplicaStateMachine = null
+
+  private val brokerId = 5
+  private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
+  private val controllerEpoch = 50
+  private val partition = TopicAndPartition("t", 0)
+  private val partitions = Seq(partition)
+  private val replica = PartitionAndReplica(partition.topic, partition.partition, brokerId)
+  private val replicas = Seq(replica)
+
+  @Before
+  def setUp(): Unit = {
+    controllerContext = new ControllerContext
+    controllerContext.epoch = controllerEpoch
+    mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils])
+    mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
+    mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
+    replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState]
+    replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkUtils,
+      replicaState, mockControllerBrokerRequestBatch)
+  }
+
+  @Test
+  def testNonexistentReplicaToNewReplicaTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, NewReplica)
+    assertEquals(NewReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToOnlineReplicaTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToOfflineReplicaTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToReplicaDeletionStartedTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionIneligible)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNonexistentReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionSuccessful)
+    assertEquals(NonExistentReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNewReplicaToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(NewReplica, NonExistentReplica)
+  }
+
+  @Test
+  def testNewReplicaToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, NewReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testNewReplicaToOfflineReplicaTransition(): Unit = {
+    replicaState.put(replica, NewReplica)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, false, null))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
+    EasyMock.verify(mockControllerBrokerRequestBatch)
+    assertEquals(NewReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidNewReplicaToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(NewReplica, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidNewReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(NewReplica, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidNewReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(NewReplica, ReplicaDeletionSuccessful)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToNewReplicaTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, NewReplica)
+  }
+
+  @Test
+  def testOnlineReplicaToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, OnlineReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testOnlineReplicaToOfflineReplicaTransition(): Unit = {
+    val otherBrokerId = brokerId + 1
+    val replicaIds = List(brokerId, otherBrokerId)
+    replicaState.put(replica, OnlineReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, replicaIds)
+    val leaderAndIsr = LeaderAndIsr(brokerId, replicaIds)
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+
+    val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, false, null))
+    val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
+    val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
+    val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
+    EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions))
+      .andReturn(Seq(GetDataResponse(Code.OK.intValue(), null, partition,
+        TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
+    EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
+      .andReturn(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)
+    EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
+      partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
+    assertEquals(updatedLeaderIsrAndControllerEpoch, controllerContext.partitionLeadershipInfo(partition))
+    assertEquals(OfflineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidOnlineReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(OnlineReplica, ReplicaDeletionSuccessful)
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToNewReplicaTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, NewReplica)
+  }
+
+  @Test
+  def testOfflineReplicaToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, OfflineReplica)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testOfflineReplicaToReplicaDeletionStartedTransition(): Unit = {
+    val callbacks = (new Callbacks.CallbackBuilder).build
+    replicaState.put(replica, OfflineReplica)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(ReplicaDeletionStarted, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidOfflineReplicaToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(OfflineReplica, ReplicaDeletionSuccessful)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToNewReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, NewReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToOnlineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, OnlineReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionStartedToOfflineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionStarted, OfflineReplica)
+  }
+
+  @Test
+  def testReplicaDeletionStartedToReplicaDeletionIneligibleTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionStarted)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionIneligible)
+    assertEquals(ReplicaDeletionIneligible, replicaState(replica))
+  }
+
+  @Test
+  def testReplicaDeletionStartedToReplicaDeletionSuccessfulTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionStarted)
+    replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionSuccessful)
+    assertEquals(ReplicaDeletionSuccessful, replicaState(replica))
+  }
+
+  @Test
+  def testReplicaDeletionSuccessfulToNonexistentReplicaTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionSuccessful)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    replicaStateMachine.handleStateChanges(replicas, NonExistentReplica)
+    assertEquals(Seq.empty, controllerContext.partitionReplicaAssignment(partition))
+    assertEquals(None, replicaState.get(replica))
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToNewReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, NewReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToOnlineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, OnlineReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToOfflineReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, OfflineReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionSuccessfulToReplicaDeletionIneligibleTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToNonexistentReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, NonExistentReplica)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToNewReplicaTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, NewReplica)
+  }
+
+  @Test
+  def testReplicaDeletionIneligibleToOnlineReplicaTransition(): Unit = {
+    replicaState.put(replica, ReplicaDeletionIneligible)
+    controllerContext.partitionReplicaAssignment.put(partition, Seq(brokerId))
+    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch)
+    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+    EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
+      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+    EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+    EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch)
+    replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+    EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch)
+    assertEquals(OnlineReplica, replicaState(replica))
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToReplicaDeletionStartedTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, ReplicaDeletionStarted)
+  }
+
+  @Test
+  def testInvalidReplicaDeletionIneligibleToReplicaDeletionSuccessfulTransition(): Unit = {
+    testInvalidTransition(ReplicaDeletionIneligible, ReplicaDeletionSuccessful)
+  }
+
+  private def testInvalidTransition(fromState: ReplicaState, toState: ReplicaState): Unit = {
+    replicaState.put(replica, fromState)
+    replicaStateMachine.handleStateChanges(replicas, toState)
+    assertEquals(fromState, replicaState(replica))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
index 3f80c28..9f172f0 100644
--- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala
@@ -229,17 +229,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation = {
+      override def handleCreation(): Unit = {
         znodeChangeHandlerCountDownLatch.countDown()
       }
-      override def handleDeletion = {}
-      override def handleDataChange = {}
       override val path: String = mockPath
     }
 
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
+    val existsRequest = ExistsRequest(mockPath, null)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)
+    val responses = zookeeperClient.handle(Seq(existsRequest, createRequest))
+    assertEquals("Response code for exists should be NONODE", Code.NONODE, Code.get(responses.head.rc))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc))
     assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
   }
 
@@ -249,17 +250,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation = {}
-      override def handleDeletion = {
+      override def handleDeletion(): Unit = {
         znodeChangeHandlerCountDownLatch.countDown()
       }
-      override def handleDataChange = {}
       override val path: String = mockPath
     }
 
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath, null)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)
+    val responses = zookeeperClient.handle(Seq(createRequest, existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc))
+    assertEquals("Response code for exists should be OK", Code.OK, Code.get(responses.head.rc))
     val deleteResponse = zookeeperClient.handle(DeleteRequest(mockPath, -1, null)).asInstanceOf[DeleteResponse]
     assertEquals("Response code for delete should be OK", Code.OK, Code.get(deleteResponse.rc))
     assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
@@ -271,17 +273,18 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
     val znodeChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChangeHandler = new ZNodeChangeHandler {
-      override def handleCreation = {}
-      override def handleDeletion = {}
-      override def handleDataChange = {
+      override def handleDataChange(): Unit = {
         znodeChangeHandlerCountDownLatch.countDown()
       }
       override val path: String = mockPath
     }
 
-    val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
-    assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
     zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsRequest = ExistsRequest(mockPath, null)
+    val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null)
+    val responses = zookeeperClient.handle(Seq(createRequest, existsRequest))
+    assertEquals("Response code for create should be OK", Code.OK, Code.get(responses.last.rc))
+    assertEquals("Response code for exists should be OK", Code.OK, Code.get(responses.head.rc))
     val setDataResponse = zookeeperClient.handle(SetDataRequest(mockPath, Array.empty[Byte], -1, null)).asInstanceOf[SetDataResponse]
     assertEquals("Response code for setData should be OK", Code.OK, Code.get(setDataResponse.rc))
     assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
@@ -293,7 +296,7 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null)
     val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1)
     val zNodeChildChangeHandler = new ZNodeChildChangeHandler {
-      override def handleChildChange = {
+      override def handleChildChange(): Unit = {
         zNodeChildChangeHandlerCountDownLatch.countDown()
       }
       override val path: String = mockPath
@@ -304,6 +307,8 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     val createResponse = zookeeperClient.handle(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
     assertEquals("Response code for create should be OK", Code.OK, Code.get(createResponse.rc))
     zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+    val getChildrenResponse = zookeeperClient.handle(GetChildrenRequest(mockPath, null)).asInstanceOf[GetChildrenResponse]
+    assertEquals("Response code for getChildren should be OK", Code.OK, Code.get(getChildrenResponse.rc))
     val createResponseChild1 = zookeeperClient.handle(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT, null))
     assertEquals("Response code for create child1 should be OK", Code.OK, Code.get(createResponseChild1.rc))
     assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))
@@ -314,12 +319,9 @@ class ZookeeperClientTest extends ZooKeeperTestHarness {
     System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf")
     val stateChangeHandlerCountDownLatch = new CountDownLatch(1)
     val stateChangeHandler = new StateChangeHandler {
-      override def beforeInitializingSession = {}
-      override def afterInitializingSession = {}
-      override def onAuthFailure = {
+      override def onAuthFailure(): Unit = {
         stateChangeHandlerCountDownLatch.countDown()
       }
-      override def onConnectionTimeout = {}
     }
     new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler)
     assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS))

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
deleted file mode 100644
index 4b90767..0000000
--- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
-import kafka.controller.{ControlledShutdownLeaderSelector, ControllerContext}
-import org.easymock.EasyMock
-import org.junit.{Assert, Test}
-import Assert._
-import kafka.cluster.Broker
-import kafka.utils.ZkUtils
-
-import scala.collection.mutable
-
-class ControlledShutdownLeaderSelectorTest {
-
-  @Test
-  def testSelectLeader() {
-    val topicPartition = TopicAndPartition("topic", 1)
-    val assignment = Seq(6, 5, 4, 3, 2, 1)
-    val preferredReplicaId = assignment.head
-
-    val firstIsr = List(1, 3, 6)
-    val firstLeader = 1
-
-    val zkUtils = EasyMock.mock(classOf[ZkUtils])
-    val controllerContext = new ControllerContext(zkUtils)
-    controllerContext.liveBrokers = assignment.map(Broker(_, Seq.empty, None)).toSet
-    controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
-    controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment)
-
-    val leaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-    val firstLeaderAndIsr = LeaderAndIsr(firstLeader, firstIsr)
-    val (secondLeaderAndIsr, secondReplicas) = leaderSelector.selectLeader(topicPartition, firstLeaderAndIsr)
-
-    assertEquals(preferredReplicaId, secondLeaderAndIsr.leader)
-    assertEquals(Seq(1, 6), secondLeaderAndIsr.isr)
-    assertEquals(1, secondLeaderAndIsr.zkVersion)
-    assertEquals(1, secondLeaderAndIsr.leaderEpoch)
-    assertEquals(assignment, secondReplicas)
-
-    controllerContext.shuttingDownBrokerIds += preferredReplicaId
-
-    val deadBrokerId = 2
-    controllerContext.liveBrokers = controllerContext.liveOrShuttingDownBrokers.filter(_.id != deadBrokerId)
-    controllerContext.shuttingDownBrokerIds -= deadBrokerId
-
-    val (thirdLeaderAndIsr, thirdReplicas) = leaderSelector.selectLeader(topicPartition, secondLeaderAndIsr)
-
-    assertEquals(1, thirdLeaderAndIsr.leader)
-    assertEquals(Seq(1), thirdLeaderAndIsr.isr)
-    assertEquals(2, thirdLeaderAndIsr.zkVersion)
-    assertEquals(2, thirdLeaderAndIsr.leaderEpoch)
-    assertEquals(Seq(6, 5, 4, 3, 1), thirdReplicas)
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 1dd0808..56f5f6d 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -136,7 +136,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
       securityProtocol))
     val nodes = brokers.map(_.getNode(listenerName))
 
-    val controllerContext = new ControllerContext(zkUtils)
+    val controllerContext = new ControllerContext
     controllerContext.liveBrokers = brokers.toSet
     val metrics = new Metrics
     val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM,


[4/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 811ff67..d4a012e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,31 +16,28 @@
  */
 package kafka.controller
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import com.yammer.metrics.core.Gauge
-import kafka.admin.{AdminUtils, PreferredReplicaLeaderElectionCommand}
+import kafka.admin.AdminOperationException
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
-import kafka.log.LogConfig
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server._
-import kafka.utils.ZkUtils._
 import kafka.utils._
-import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
-import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener}
 import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, StopReplicaResponse, LeaderAndIsrResponse}
+import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse}
 import org.apache.kafka.common.utils.Time
-import org.apache.zookeeper.Watcher.Event.KeeperState
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.{Code, NodeExistsException}
 
 import scala.collection._
 import scala.util.Try
 
-class ControllerContext(val zkUtils: ZkUtils) {
+class ControllerContext {
   val stats = new ControllerStats
 
   var controllerChannelManager: ControllerChannelManager = null
@@ -133,33 +130,13 @@ object KafkaController extends Logging {
   val InitialControllerEpoch = 1
   val InitialControllerEpochZkVersion = 1
 
-  def parseControllerId(controllerInfoString: String): Int = {
-    try {
-      Json.parseFull(controllerInfoString) match {
-        case Some(js) => js.asJsonObject("brokerid").to[Int]
-        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
-      }
-    } catch {
-      case _: Throwable =>
-        // It may be due to an incompatible controller register version
-        warn("Failed to parse the controller info as json. "
-          + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
-        try controllerInfoString.toInt
-        catch {
-          case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
-        }
-    }
-  }
 }
 
-class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
-
+class KafkaController(val config: KafkaConfig, kafkaControllerZkUtils: KafkaControllerZkUtils, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
   this.logIdent = s"[Controller id=${config.brokerId}] "
 
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
-  val controllerContext = new ControllerContext(zkUtils)
-  val partitionStateMachine = new PartitionStateMachine(this, stateChangeLogger)
-  val replicaStateMachine = new ReplicaStateMachine(this, stateChangeLogger)
+  val controllerContext = new ControllerContext
 
   // have a separate scheduler for the controller to be able to start and stop independently of the kafka server
   // visible for testing
@@ -169,21 +146,20 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   private[controller] val eventManager = new ControllerEventManager(controllerContext.stats.rateAndTimeMetrics,
     _ => updateMetrics())
 
-  val topicDeletionManager = new TopicDeletionManager(this, eventManager)
-  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
-  private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
-  private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
-  private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
+  val topicDeletionManager = new TopicDeletionManager(this, eventManager, kafkaControllerZkUtils)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, stateChangeLogger)
-
-  private val brokerChangeListener = new BrokerChangeListener(this, eventManager)
-  private val topicChangeListener = new TopicChangeListener(this, eventManager)
-  private val topicDeletionListener = new TopicDeletionListener(this, eventManager)
-  private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty
-  private val partitionReassignmentListener = new PartitionReassignmentListener(this, eventManager)
-  private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this, eventManager)
-  private val isrChangeNotificationListener = new IsrChangeNotificationListener(this, eventManager)
-  private val logDirEventNotificationListener = new LogDirEventNotificationListener(this, eventManager)
+  val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, kafkaControllerZkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val partitionStateMachine = new PartitionStateMachine(config, stateChangeLogger, controllerContext, topicDeletionManager, kafkaControllerZkUtils, mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+
+  private val controllerChangeHandler = new ControllerChangeHandler(this, eventManager)
+  private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
+  private val topicChangeHandler = new TopicChangeHandler(this, eventManager)
+  private val topicDeletionHandler = new TopicDeletionHandler(this, eventManager)
+  private val partitionModificationsHandlers: mutable.Map[String, PartitionModificationsHandler] = mutable.Map.empty
+  private val partitionReassignmentHandler = new PartitionReassignmentHandler(this, eventManager)
+  private val preferredReplicaElectionHandler = new PreferredReplicaElectionHandler(this, eventManager)
+  private val isrChangeNotificationHandler = new IsrChangeNotificationHandler(this, eventManager)
+  private val logDirEventNotificationHandler = new LogDirEventNotificationHandler(this, eventManager)
 
   @volatile private var activeControllerId = -1
   @volatile private var offlinePartitionCount = 0
@@ -270,35 +246,40 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    * This ensures another controller election will be triggered and there will always be an actively serving controller
    */
   def onControllerFailover() {
-    info("Starting become controller state transition")
+    info("Reading controller epoch from zookeeper")
     readControllerEpochFromZookeeper()
+    info("Incrementing controller epoch in zookeeper")
     incrementControllerEpoch()
-    LogDirUtils.deleteLogDirEvents(zkUtils)
-
+    info("Registering handlers")
     // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
-    registerPartitionReassignmentListener()
-    registerIsrChangeNotificationListener()
-    registerPreferredReplicaElectionListener()
-    registerTopicChangeListener()
-    registerTopicDeletionListener()
-    registerBrokerChangeListener()
-    registerLogDirEventNotificationListener()
-
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(brokerChangeHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(topicChangeHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(topicDeletionHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(logDirEventNotificationHandler)
+    kafkaControllerZkUtils.registerZNodeChildChangeHandler(isrChangeNotificationHandler)
+    kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)
+    kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+    info("Deleting log dir event notifications")
+    kafkaControllerZkUtils.deleteLogDirEventNotifications()
+    info("Deleting isr change notifications")
+    kafkaControllerZkUtils.deleteIsrChangeNotifications()
+    info("Initializing controller context")
     initializeControllerContext()
+    info("Fetching topic deletions in progress")
     val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
+    info("Initializing topic deletion manager")
     topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
 
     // We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
     // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
     // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
     // partitionStateMachine.startup().
+    info("Sending update metadata request")
     sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
 
     replicaStateMachine.startup()
     partitionStateMachine.startup()
 
-    // register the partition change listeners for all existing topics on failover
-    controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
     info(s"Ready to serve as the new controller with epoch $epoch")
     maybeTriggerPartitionReassignment()
     topicDeletionManager.tryTopicDeletion()
@@ -323,10 +304,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onControllerResignation() {
     debug("Resigning")
     // de-register listeners
-    deregisterIsrChangeNotificationListener()
-    deregisterPartitionReassignmentListener()
-    deregisterPreferredReplicaElectionListener()
-    deregisterLogDirEventNotificationListener()
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(isrChangeNotificationHandler.path)
+    kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentHandler.path)
+    kafkaControllerZkUtils.unregisterZNodeChangeHandler(preferredReplicaElectionHandler.path)
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(logDirEventNotificationHandler.path)
 
     // reset topic deletion manager
     topicDeletionManager.reset()
@@ -339,15 +320,15 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     globalPartitionCount = 0
 
     // de-register partition ISR listener for on-going partition reassignment task
-    deregisterPartitionReassignmentIsrChangeListeners()
+    unregisterPartitionReassignmentIsrChangeHandlers()
     // shutdown partition state machine
     partitionStateMachine.shutdown()
-    deregisterTopicChangeListener()
-    partitionModificationsListeners.keys.foreach(deregisterPartitionModificationsListener)
-    deregisterTopicDeletionListener()
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(topicChangeHandler.path)
+    unregisterPartitionModificationsHandlers(partitionModificationsHandlers.keys.toSeq)
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(topicDeletionHandler.path)
     // shutdown replica state machine
     replicaStateMachine.shutdown()
-    deregisterBrokerChangeListener()
+    kafkaControllerZkUtils.unregisterZNodeChildChangeHandler(brokerChangeHandler.path)
 
     resetControllerContext()
 
@@ -367,7 +348,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onBrokerLogDirFailure(brokerIds: Seq[Int]) {
     // send LeaderAndIsrRequest for all replicas on those brokers to see if they are still online.
     val replicasOnBrokers = controllerContext.replicasOnBrokers(brokerIds.toSet)
-    replicaStateMachine.handleStateChanges(replicasOnBrokers, OnlineReplica)
+    replicaStateMachine.handleStateChanges(replicasOnBrokers.toSeq, OnlineReplica)
   }
 
   /**
@@ -396,7 +377,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
     val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
-    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
+    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica)
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -450,11 +431,11 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
         !topicDeletionManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
 
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas
-    partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // trigger OfflineReplica state change for those newly offline replicas
-    replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion, OfflineReplica)
+    replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
 
     // fail deletion of topics that affected by the offline replicas
     if (newOfflineReplicasForDeletion.nonEmpty) {
@@ -472,20 +453,6 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   /**
-   * This callback is invoked by the partition state machine's topic change listener with the list of new topics
-   * and partitions as input. It does the following -
-   * 1. Registers partition change listener. This is not required until KAFKA-347
-   * 2. Invokes the new partition callback
-   * 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served
-   */
-  def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
-    info("New topic creation callback for %s".format(newPartitions.mkString(",")))
-    // subscribe to partition changes
-    topics.foreach(topic => registerPartitionModificationsListener(topic))
-    onNewPartitionCreation(newPartitions)
-  }
-
-  /**
    * This callback is invoked by the topic change callback with the list of failed brokers as input.
    * It does the following -
    * 1. Move the newly created partitions to the NewPartition state
@@ -493,10 +460,10 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    */
   def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
-    partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
-    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
-    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
-    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
+    partitionStateMachine.handleStateChanges(newPartitions.toSeq, NewPartition)
+    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, NewReplica)
+    partitionStateMachine.handleStateChanges(newPartitions.toSeq, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions).toSeq, OnlineReplica)
   }
 
   /**
@@ -542,7 +509,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
    */
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
+    if (!areReplicasInIsr(topicAndPartition, reassignedReplicas)) {
       info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
         "reassigned not yet caught up with the leader")
       val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
@@ -561,7 +528,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
       //5. replicas in RAR -> OnlineReplica
       reassignedReplicas.foreach { replica =>
-        replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
+        replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
           replica)), OnlineReplica)
       }
       //6. Set AR to RAR in memory.
@@ -584,15 +551,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     }
   }
 
-  private def watchIsrChangesForReassignedPartition(topic: String,
-                                                    partition: Int,
+  private def watchIsrChangesForReassignedPartition(partition: TopicAndPartition,
                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val isrChangeListener = new PartitionReassignmentIsrChangeListener(this, eventManager, topic, partition,
-      reassignedReplicas.toSet)
-    reassignedPartitionContext.isrChangeListener = isrChangeListener
+    val partitionReassignmentIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(this, eventManager, partition)
+    reassignedPartitionContext.partitionReassignmentIsrChangeHandler = partitionReassignmentIsrChangeHandler
     // register listener on the leader and isr path to wait until they catch up with the current leader
-    zkUtils.subscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener)
+    kafkaControllerZkUtils.registerZNodeChangeHandler(partitionReassignmentIsrChangeHandler)
   }
 
   def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
@@ -610,7 +574,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
           } else {
             info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
             // first register ISR change listener
-            watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
+            watchIsrChangesForReassignedPartition(topicAndPartition, reassignedPartitionContext)
             controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
             // mark topic ineligible for deletion for the partitions being reassigned
             topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
@@ -629,7 +593,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
-      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+      partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy))
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
@@ -663,49 +627,38 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   def incrementControllerEpoch() = {
-    try {
-      val newControllerEpoch = controllerContext.epoch + 1
-      val (updateSucceeded, newVersion) = zkUtils.conditionalUpdatePersistentPathIfExists(
-        ZkUtils.ControllerEpochPath, newControllerEpoch.toString, controllerContext.epochZkVersion)
-      if(!updateSucceeded)
+    val newControllerEpoch = controllerContext.epoch + 1
+    val setDataResponse = kafkaControllerZkUtils.setControllerEpochRaw(newControllerEpoch, controllerContext.epochZkVersion)
+    if (Code.get(setDataResponse.rc) == Code.OK) {
+      controllerContext.epochZkVersion = setDataResponse.stat.getVersion
+      controllerContext.epoch = newControllerEpoch
+    } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+      // if path doesn't exist, this is the first controller whose epoch should be 1
+      // the following call can still fail if another controller gets elected between checking if the path exists and
+      // trying to create the controller epoch path
+      val createResponse = kafkaControllerZkUtils.createControllerEpochRaw(KafkaController.InitialControllerEpoch)
+      if (Code.get(createResponse.rc) == Code.OK) {
+        controllerContext.epoch = KafkaController.InitialControllerEpoch
+        controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
+      } else if (Code.get(createResponse.rc) == Code.NODEEXISTS) {
         throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
-      else {
-        controllerContext.epochZkVersion = newVersion
-        controllerContext.epoch = newControllerEpoch
+      } else {
+        val exception = KeeperException.create(Code.get(createResponse.rc))
+        error("Error while incrementing controller epoch", exception)
+        throw exception
       }
-    } catch {
-      case _: ZkNoNodeException =>
-        // if path doesn't exist, this is the first controller whose epoch should be 1
-        // the following call can still fail if another controller gets elected between checking if the path exists and
-        // trying to create the controller epoch path
-        try {
-          zkUtils.createPersistentPath(ZkUtils.ControllerEpochPath, KafkaController.InitialControllerEpoch.toString)
-          controllerContext.epoch = KafkaController.InitialControllerEpoch
-          controllerContext.epochZkVersion = KafkaController.InitialControllerEpochZkVersion
-        } catch {
-          case _: ZkNodeExistsException => throw new ControllerMovedException("Controller moved to another broker. " +
-            "Aborting controller startup procedure")
-          case oe: Throwable => error("Error while incrementing controller epoch", oe)
-        }
-      case oe: Throwable => error("Error while incrementing controller epoch", oe)
-
+    } else {
+      throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure")
     }
-    info(s"Incremented epoch to ${controllerContext.epoch}")
-  }
-
-  private def registerSessionExpirationListener() = {
-    zkUtils.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
-  }
-
-  private def registerControllerChangeListener() = {
-    zkUtils.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
+    info("Controller %d incremented epoch to %d".format(config.brokerId, controllerContext.epoch))
   }
 
   private def initializeControllerContext() {
     // update controller cache with delete topic information
-    controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster().toSet
-    controllerContext.allTopics = zkUtils.getAllTopics().toSet
-    controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
+    controllerContext.liveBrokers = kafkaControllerZkUtils.getAllBrokersInCluster.toSet
+    controllerContext.allTopics = kafkaControllerZkUtils.getAllTopicsInCluster.toSet
+    registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
+    controllerContext.partitionReplicaAssignment = mutable.Map.empty ++ kafkaControllerZkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSet)
     controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
@@ -719,7 +672,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   private def fetchPendingPreferredReplicaElections(): Set[TopicAndPartition] = {
-    val partitionsUndergoingPreferredReplicaElection = zkUtils.getPartitionsUndergoingPreferredReplicaElection()
+    val partitionsUndergoingPreferredReplicaElection = kafkaControllerZkUtils.getPreferredReplicaElection
     // check if they are already completed or topic was deleted
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
@@ -755,7 +708,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
 
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
-    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+    val partitionsBeingReassigned = kafkaControllerZkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
     // check if they are already completed or topic was deleted
     val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
       val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
@@ -774,7 +727,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   private def fetchTopicDeletionsInProgress(): (Set[String], Set[String]) = {
-    val topicsToBeDeleted = zkUtils.getChildrenParentMayNotExist(ZkUtils.DeleteTopicsPath).toSet
+    val topicsToBeDeleted = kafkaControllerZkUtils.getTopicDeletions.toSet
     val topicsWithOfflineReplicas = controllerContext.partitionReplicaAssignment.filter { case (partition, replicas) =>
       replicas.exists(r => !controllerContext.isReplicaOnline(r, partition))
     }.keySet.map(_.topic)
@@ -797,15 +750,16 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     controllerContext.controllerChannelManager.startup()
   }
 
-  def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
-    val leaderAndIsrInfo = zkUtils.getPartitionLeaderAndIsrForTopics(topicAndPartitions)
-    for ((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
-      controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
+  def updateLeaderAndIsrCache(partitions: Seq[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keys.toSeq) {
+    val leaderIsrAndControllerEpochs = kafkaControllerZkUtils.getTopicPartitionStates(partitions)
+    leaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+      controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    }
   }
 
-  private def areReplicasInIsr(topic: String, partition: Int, replicas: Seq[Int]): Boolean = {
-    zkUtils.getLeaderAndIsrForPartition(topic, partition).exists { leaderAndIsr =>
-      replicas.forall(leaderAndIsr.isr.contains)
+  private def areReplicasInIsr(partition: TopicAndPartition, replicas: Seq[Int]): Boolean = {
+    kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition).exists { leaderIsrAndControllerEpoch =>
+      replicas.forall(leaderIsrAndControllerEpoch.leaderAndIsr.isr.contains)
     }
   }
 
@@ -821,7 +775,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
-      partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
+      partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
     } else {
       // check if the leader is alive or not
       if (controllerContext.isReplicaOnline(currentLeader, topicAndPartition)) {
@@ -832,7 +786,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       } else {
         info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
           "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
-        partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
+        partitionStateMachine.handleStateChanges(Seq(topicAndPartition), OnlinePartition, Option(ReassignPartitionLeaderElectionStrategy))
       }
     }
   }
@@ -844,22 +798,28 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     val partition = topicAndPartition.partition
     // first move the replica to offline state (the controller removes it from the ISR)
     val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, OfflineReplica)
     // send stop replica command to the old replicas
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionStarted)
     // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
-    replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, ReplicaDeletionSuccessful)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted.toSeq, NonExistentReplica)
   }
 
-  private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
+  private def updateAssignedReplicasForPartition(partition: TopicAndPartition,
                                                  replicas: Seq[Int]) {
-    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
-    partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
-    updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
-    info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
-    // update the assigned replica list after a successful zookeeper write
-    controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
+    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(partition.topic))
+    partitionsAndReplicasForThisTopic.put(partition, replicas)
+    val setDataResponse = kafkaControllerZkUtils.setTopicAssignmentRaw(partition.topic, partitionsAndReplicasForThisTopic.toMap)
+    if (Code.get(setDataResponse.rc) == Code.OK) {
+      info("Updated assigned replicas for partition %s being reassigned to %s ".format(partition, replicas.mkString(",")))
+      // update the assigned replica list after a successful zookeeper write
+      controllerContext.partitionReplicaAssignment.put(partition, replicas)
+    } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+      throw new IllegalStateException("Topic %s doesn't exist".format(partition.topic))
+    } else {
+      throw new KafkaException(KeeperException.create(Code.get(setDataResponse.rc)))
+    }
   }
 
   private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
@@ -868,18 +828,18 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
     // replicas list
     newReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
+      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
     }
   }
 
-  private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+  private def updateLeaderEpochAndSendRequest(partition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
     val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
-    updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
+    updateLeaderEpoch(partition) match {
       case Some(updatedLeaderIsrAndControllerEpoch) =>
         try {
           brokerRequestBatch.newBatch()
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
-            topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, partition.topic,
+            partition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas, isNew = false)
           brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
         } catch {
           case e: IllegalStateException =>
@@ -887,97 +847,43 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
         }
         stateChangeLog.trace(s"Sent LeaderAndIsr request $updatedLeaderIsrAndControllerEpoch with new assigned replica " +
           s"list ${newAssignedReplicas.mkString(",")} to leader ${updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader} " +
-          s"for partition being reassigned $topicAndPartition")
+          s"for partition being reassigned $partition")
       case None => // fail the reassignment
         stateChangeLog.error("Failed to send LeaderAndIsr request with new assigned replica list " +
-          s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $topicAndPartition")
+          s"${newAssignedReplicas.mkString( ",")} to leader for partition being reassigned $partition")
     }
   }
 
-  private def registerBrokerChangeListener() = {
-    zkUtils.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
-  }
-
-  private def deregisterBrokerChangeListener() = {
-    zkUtils.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
-  }
-
-  private def registerTopicChangeListener() = {
-    zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
-  }
-
-  private def deregisterTopicChangeListener() = {
-    zkUtils.unsubscribeChildChanges(BrokerTopicsPath, topicChangeListener)
-  }
-
-  def registerPartitionModificationsListener(topic: String) = {
-    partitionModificationsListeners.put(topic, new PartitionModificationsListener(this, eventManager, topic))
-    zkUtils.subscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
-  }
-
-  def deregisterPartitionModificationsListener(topic: String) = {
-    zkUtils.unsubscribeDataChanges(getTopicPath(topic), partitionModificationsListeners(topic))
-    partitionModificationsListeners.remove(topic)
-  }
-
-  private def registerTopicDeletionListener() = {
-    zkUtils.subscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
-  }
-
-  private def deregisterTopicDeletionListener() = {
-    zkUtils.unsubscribeChildChanges(DeleteTopicsPath, topicDeletionListener)
-  }
-
-  private def registerPartitionReassignmentListener() = {
-    zkUtils.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
-  }
-
-  private def deregisterPartitionReassignmentListener() = {
-    zkUtils.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignmentListener)
-  }
-
-  private def registerIsrChangeNotificationListener() = {
-    debug("Registering IsrChangeNotificationListener")
-    zkUtils.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
-  }
-
-  private def deregisterIsrChangeNotificationListener() = {
-    debug("De-registering IsrChangeNotificationListener")
-    zkUtils.unsubscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
-  }
-
-  private def registerPreferredReplicaElectionListener() {
-    zkUtils.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+  private def registerPartitionModificationsHandlers(topics: Seq[String]) = {
+    topics.foreach { topic =>
+      val partitionModificationsHandler = new PartitionModificationsHandler(this, eventManager, topic)
+      partitionModificationsHandlers.put(topic, partitionModificationsHandler)
+    }
+    partitionModificationsHandlers.values.foreach(kafkaControllerZkUtils.registerZNodeChangeHandler)
   }
 
-  private def deregisterPreferredReplicaElectionListener() {
-    zkUtils.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+  def unregisterPartitionModificationsHandlers(topics: Seq[String]) = {
+    topics.foreach { topic =>
+      partitionModificationsHandlers.remove(topic)
+        .foreach(handler => kafkaControllerZkUtils.unregisterZNodeChangeHandler(handler.path))
+    }
   }
 
-  private def deregisterPartitionReassignmentIsrChangeListeners() {
+  private def unregisterPartitionReassignmentIsrChangeHandlers() {
     controllerContext.partitionsBeingReassigned.foreach {
       case (topicAndPartition, reassignedPartitionsContext) =>
-        val zkPartitionPath = getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
-        zkUtils.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+        val partitionReassignmentIsrChangeHandler =
+          reassignedPartitionsContext.partitionReassignmentIsrChangeHandler
+        kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
     }
   }
 
-  private def registerLogDirEventNotificationListener() = {
-    debug("Registering logDirEventNotificationListener")
-    zkUtils.subscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
-  }
-
-  private def deregisterLogDirEventNotificationListener() = {
-    debug("De-registering logDirEventNotificationListener")
-    zkUtils.unsubscribeChildChanges(ZkUtils.LogDirEventNotificationPath, logDirEventNotificationListener)
-  }
-
   private def readControllerEpochFromZookeeper() {
     // initialize the controller epoch and zk version by reading from zookeeper
-    if(controllerContext.zkUtils.pathExists(ZkUtils.ControllerEpochPath)) {
-      val epochData = controllerContext.zkUtils.readData(ZkUtils.ControllerEpochPath)
-      controllerContext.epoch = epochData._1.toInt
-      controllerContext.epochZkVersion = epochData._2.getVersion
+    val epochAndStatOpt = kafkaControllerZkUtils.getControllerEpoch
+    epochAndStatOpt.foreach { case (epoch, stat) =>
+      controllerContext.epoch = epoch
+      controllerContext.epochZkVersion = stat.getVersion
       info("Initialized controller epoch to %d and zk version %d".format(controllerContext.epoch, controllerContext.epochZkVersion))
     }
   }
@@ -985,33 +891,35 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
     if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
       // stop watching the ISR changes for this partition
-      zkUtils.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-        controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+      val partitionReassignmentIsrChangeHandler =
+        controllerContext.partitionsBeingReassigned(topicAndPartition).partitionReassignmentIsrChangeHandler
+      kafkaControllerZkUtils.unregisterZNodeChangeHandler(partitionReassignmentIsrChangeHandler.path)
     }
     // read the current list of reassigned partitions from zookeeper
-    val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
+    val partitionsBeingReassigned = kafkaControllerZkUtils.getPartitionReassignment.mapValues(replicas => ReassignedPartitionsContext(replicas))
     // remove this partition from that list
     val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
-    if (updatedPartitionsBeingReassigned.size < partitionsBeingReassigned.size)
-      zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
+    val reassignment = updatedPartitionsBeingReassigned.mapValues(_.newReplicas)
+    if (reassignment.isEmpty) {
+      info("No more partitions need to be reassigned. Deleting zk path %s".format(ReassignPartitionsZNode.path))
+      kafkaControllerZkUtils.deletePartitionReassignment()
+    } else {
+      val setDataResponse = kafkaControllerZkUtils.setPartitionReassignmentRaw(reassignment)
+      if (Code.get(setDataResponse.rc) == Code.OK) {
+      } else if (Code.get(setDataResponse.rc) == Code.NONODE) {
+        val createDataResponse = kafkaControllerZkUtils.createPartitionReassignment(reassignment)
+        if (Code.get(createDataResponse.rc) != Code.OK) {
+          throw new AdminOperationException(KeeperException.create(Code.get(createDataResponse.rc)))
+        }
+      } else {
+        throw new AdminOperationException(KeeperException.create(Code.get(setDataResponse.rc)))
+      }
+    }
     // update the cache. NO-OP if the partition's reassignment was never started
     controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
 
-  def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
-                                         newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
-    try {
-      val zkPath = getTopicPath(topicAndPartition.topic)
-      val jsonPartitionMap = zkUtils.replicaAssignmentZkData(newReplicaAssignmentForTopic.map(e => e._1.partition.toString -> e._2))
-      zkUtils.updatePersistentPath(zkPath, jsonPartitionMap)
-      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
-    } catch {
-      case _: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
-      case e2: Throwable => throw new KafkaException(e2.toString)
-    }
-  }
-
   def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition],
                                                    isTriggeredByAutoRebalance : Boolean) {
     for(partition <- partitionsToBeRemoved) {
@@ -1025,7 +933,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
       }
     }
     if (!isTriggeredByAutoRebalance)
-      zkUtils.deletePath(ZkUtils.PreferredReplicaLeaderElectionPath)
+      kafkaControllerZkUtils.deletePreferredReplicaElection()
   }
 
   /**
@@ -1046,93 +954,22 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
   }
 
   /**
-   * Removes a given partition replica from the ISR; if it is not the current
-   * leader and there are sufficient remaining replicas in ISR.
-   *
-   * @param topic topic
-   * @param partition partition
-   * @param replicaId replica Id
-   * @return the new leaderAndIsr (with the replica removed if it was present),
-   *         or None if leaderAndIsr is empty.
-   */
-  def removeReplicaFromIsr(topic: String, partition: Int, replicaId: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Removing replica %d from ISR %s for partition %s.".format(replicaId,
-      controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.isr.mkString(","), topicAndPartition))
-    var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
-    var zkWriteCompleteOrUnnecessary = false
-    while (!zkWriteCompleteOrUnnecessary) {
-      // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
-        case Some(leaderIsrAndEpoch) => // increment the leader epoch even if the ISR changes
-          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
-          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
-          if(controllerEpoch > epoch)
-            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
-              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
-              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
-          if (leaderAndIsr.isr.contains(replicaId)) {
-            // if the replica to be removed from the ISR is also the leader, set the new leader value to -1
-            val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
-            var newIsr = leaderAndIsr.isr.filter(b => b != replicaId)
-
-            // if the replica to be removed from the ISR is the last surviving member of the ISR and unclean leader election
-            // is disallowed for the corresponding topic, then we must preserve the ISR membership so that the replica can
-            // eventually be restored as the leader.
-            if (newIsr.isEmpty && !LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(zkUtils,
-              ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
-              info("Retaining last ISR %d of partition %s since unclean leader election is disabled".format(replicaId, topicAndPartition))
-              newIsr = leaderAndIsr.isr
-            }
-
-            val newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
-            // update the new leadership decision in zookeeper or retry
-            val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
-              newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-
-            val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
-            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
-            if (updateSucceeded) {
-              info(s"New leader and ISR for partition $topicAndPartition is $leaderWithNewVersion")
-            }
-            updateSucceeded
-          } else {
-            warn(s"Cannot remove replica $replicaId from ISR of partition $topicAndPartition since it is not in the ISR." +
-              s" Leader = ${leaderAndIsr.leader} ; ISR = ${leaderAndIsr.isr}")
-            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderAndIsr, epoch))
-            controllerContext.partitionLeadershipInfo.put(topicAndPartition, finalLeaderIsrAndControllerEpoch.get)
-            true
-          }
-        case None =>
-          warn("Cannot remove replica %d from ISR of %s - leaderAndIsr is empty.".format(replicaId, topicAndPartition))
-          true
-      }
-    }
-    finalLeaderIsrAndControllerEpoch
-  }
-
-  /**
    * Does not change leader or isr, but just increments the leader epoch
    *
-   * @param topic topic
    * @param partition partition
    * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
    */
-  private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    debug("Updating leader epoch for partition %s.".format(topicAndPartition))
+  private def updateLeaderEpoch(partition: TopicAndPartition): Option[LeaderIsrAndControllerEpoch] = {
+    debug("Updating leader epoch for partition %s.".format(partition))
     var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
     var zkWriteCompleteOrUnnecessary = false
     while (!zkWriteCompleteOrUnnecessary) {
       // refresh leader and isr from zookeeper again
-      val leaderIsrAndEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
-      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
-        case Some(leaderIsrAndEpoch) =>
-          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
-          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
-          if(controllerEpoch > epoch)
+      zkWriteCompleteOrUnnecessary = kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+        case Some(leaderIsrAndControllerEpoch) =>
+          val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+          val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+          if (controllerEpoch > epoch)
             throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
               "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
               "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
@@ -1140,19 +977,19 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
           // assigned replica list
           val newLeaderAndIsr = leaderAndIsr.newEpochAndZkVersion
           // update the new leadership decision in zookeeper or retry
-          val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic,
-            partition, newLeaderAndIsr, epoch, leaderAndIsr.zkVersion)
-
-          val leaderWithNewVersion = newLeaderAndIsr.withZkVersion(newVersion)
-          finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(leaderWithNewVersion, epoch))
-          if (updateSucceeded) {
-            info(s"Updated leader epoch for partition $topicAndPartition to ${leaderWithNewVersion.leaderEpoch}")
-          }
-          updateSucceeded
+          val (successfulUpdates, updatesToRetry, failedUpdates) =
+            kafkaControllerZkUtils.updateLeaderAndIsr(immutable.Map(partition -> newLeaderAndIsr), epoch)
+          if (successfulUpdates.contains(partition)) {
+            val finalLeaderAndIsr = successfulUpdates(partition)
+            finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(finalLeaderAndIsr, epoch))
+            info(s"Updated leader epoch for partition $partition to ${finalLeaderAndIsr.leaderEpoch}")
+            true
+          } else if (failedUpdates.contains(partition)) {
+            throw failedUpdates(partition)
+          } else false
         case None =>
-          throw new IllegalStateException(s"Cannot update leader epoch for partition $topicAndPartition as " +
+          throw new IllegalStateException(s"Cannot update leader epoch for partition $partition as " +
             "leaderAndIsr path is empty. This could mean we somehow tried to reassign a partition that doesn't exist")
-          true
       }
     }
     finalLeaderIsrAndControllerEpoch
@@ -1194,654 +1031,565 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met
     }
   }
 
-  def getControllerID(): Int = {
-    controllerContext.zkUtils.readDataMaybeNull(ZkUtils.ControllerPath)._1 match {
-      case Some(controller) => KafkaController.parseControllerId(controller)
-      case None => -1
-    }
-  }
-
-  case class BrokerChange(currentBrokerList: Seq[String]) extends ControllerEvent {
+  case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
 
-    def state = ControllerState.BrokerChange
+    def state = ControllerState.AutoLeaderBalance
 
     override def process(): Unit = {
       if (!isActive) return
-      // Read the current broker list from ZK again instead of using currentBrokerList to increase
-      // the odds of processing recent broker changes in a single ControllerEvent (KAFKA-5502).
-      val curBrokers = zkUtils.getAllBrokersInCluster().toSet
-      val curBrokerIds = curBrokers.map(_.id)
-      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
-      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
-      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
-      val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
-      controllerContext.liveBrokers = curBrokers
-      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
-      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
-      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
-      info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
-        .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
-      newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
-      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
-      if (newBrokerIds.nonEmpty)
-        onBrokerStartup(newBrokerIdsSorted)
-      if (deadBrokerIds.nonEmpty)
-        onBrokerFailure(deadBrokerIdsSorted)
+      try {
+        checkAndTriggerAutoLeaderRebalance()
+      } finally {
+        scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+      }
     }
   }
 
-  case class TopicChange(topics: Set[String]) extends ControllerEvent {
+  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
 
-    def state = ControllerState.TopicChange
+    def state = ControllerState.ControlledShutdown
 
     override def process(): Unit = {
-      if (!isActive) return
-      val newTopics = topics -- controllerContext.allTopics
-      val deletedTopics = controllerContext.allTopics -- topics
-      controllerContext.allTopics = topics
-
-      val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
-      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
-        !deletedTopics.contains(p._1.topic))
-      controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
-      info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
-        deletedTopics, addedPartitionReplicaAssignment))
-      if (newTopics.nonEmpty)
-        onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
+      val controlledShutdownResult = Try { doControlledShutdown(id) }
+      controlledShutdownCallback(controlledShutdownResult)
     }
-  }
 
-  case class PartitionModifications(topic: String) extends ControllerEvent {
+    private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
+      if (!isActive) {
+        throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+      }
 
-    def state = ControllerState.TopicChange
+      info("Shutting down broker " + id)
 
-    override def process(): Unit = {
-      if (!isActive) return
-      val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
-      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
-        !controllerContext.partitionReplicaAssignment.contains(p._1))
-      if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
-        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
-          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
-      else {
-        if (partitionsToBeAdded.nonEmpty) {
-          info(s"New partitions to be added $partitionsToBeAdded")
-          controllerContext.partitionReplicaAssignment.++=(partitionsToBeAdded)
-          onNewPartitionCreation(partitionsToBeAdded.keySet)
+      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
+        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+
+      controllerContext.shuttingDownBrokerIds.add(id)
+      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
+      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
+
+      val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter { partition =>
+        controllerContext.partitionReplicaAssignment(partition).size > 1 && controllerContext.partitionLeadershipInfo.contains(partition)
+      }
+      val (partitionsLeadByBroker, partitionsFollowedByBroker) = partitionsToActOn.partition { partition =>
+        controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
+      }
+      partitionStateMachine.handleStateChanges(partitionsLeadByBroker.toSeq, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy))
+      try {
+        brokerRequestBatch.newBatch()
+        partitionsFollowedByBroker.foreach { partition =>
+          brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), partition.topic,
+            partition.partition, deletePartition = false, null)
         }
+        brokerRequestBatch.sendRequestsToBrokers(epoch)
+      } catch {
+        case e: IllegalStateException =>
+          handleIllegalState(e)
+      }
+      // If the broker is a follower, updates the isr in ZK and notifies the current leader
+      replicaStateMachine.handleStateChanges(partitionsFollowedByBroker.map(partition => PartitionAndReplica(partition.topic, partition.partition, id)).toSeq, OfflineReplica)
+      def replicatedPartitionsBrokerLeads() = {
+        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+        controllerContext.partitionLeadershipInfo.filter {
+          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
+            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
+        }.keys
       }
+      replicatedPartitionsBrokerLeads().toSet
     }
   }
 
-  case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
+  case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
 
-    def state = ControllerState.TopicDeletion
+    def state = ControllerState.LeaderAndIsrResponseReceived
 
     override def process(): Unit = {
+      import JavaConverters._
       if (!isActive) return
-      debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
-      val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
-      if (nonExistentTopics.nonEmpty) {
-        warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
-        nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
+      val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
+
+      if (leaderAndIsrResponse.error != Errors.NONE) {
+        stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
+        return
       }
-      topicsToBeDeleted --= nonExistentTopics
-      if (config.deleteTopicEnable) {
-        if (topicsToBeDeleted.nonEmpty) {
-          info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
-          // mark topic ineligible for deletion if other state changes are in progress
-          topicsToBeDeleted.foreach { topic =>
-            val partitionReassignmentInProgress =
-              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
-            if (partitionReassignmentInProgress)
-              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
-          }
-          // add topic to deletion list
-          topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
-        }
-      } else {
-        // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
-        for (topic <- topicsToBeDeleted) {
-          info(s"Removing ${getDeleteTopicPath(topic)} since delete topic is disabled")
-          zkUtils.deletePath(getDeleteTopicPath(topic))
-        }
+
+      val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
+        new TopicAndPartition(_)).toSet
+      val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
+        new TopicAndPartition(_)).toSet
+      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
+      val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
+      controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
+      val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
+
+      if (newOfflineReplicas.nonEmpty) {
+        stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
+        onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
       }
     }
   }
 
-  case class PartitionReassignment(partitionReassignment: Map[TopicAndPartition, Seq[Int]]) extends ControllerEvent {
+  case class TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
 
-    def state = ControllerState.PartitionReassignment
+    def state = ControllerState.TopicDeletion
 
     override def process(): Unit = {
+      import JavaConverters._
       if (!isActive) return
-      val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
-        if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
-          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
-            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
-          removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
-        } else {
-          val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
-          initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
-        }
+      val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
+      debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+      val responseMap = stopReplicaResponse.responses.asScala
+      val partitionsInError =
+        if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
+        else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
+      val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+      // move all the failed replicas to ReplicaDeletionIneligible
+      topicDeletionManager.failReplicaDeletion(replicasInError)
+      if (replicasInError.size != responseMap.size) {
+        // some replicas could have been successfully deleted
+        val deletedReplicas = responseMap.keySet -- partitionsInError
+        topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
       }
     }
-
   }
 
-  case class PartitionReassignmentIsrChange(topicAndPartition: TopicAndPartition, reassignedReplicas: Set[Int]) extends ControllerEvent {
+  case object Startup extends ControllerEvent {
 
-    def state = ControllerState.PartitionReassignment
+    def state = ControllerState.ControllerChange
 
     override def process(): Unit = {
-      if (!isActive) return
-        // check if this partition is still being reassigned or not
-      controllerContext.partitionsBeingReassigned.get(topicAndPartition).foreach { reassignedPartitionContext =>
-        // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
-        val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topicAndPartition.topic, topicAndPartition.partition)
-        newLeaderAndIsrOpt match {
-          case Some(leaderAndIsr) => // check if new replicas have joined ISR
-            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
-            if(caughtUpReplicas == reassignedReplicas) {
-              // resume the partition reassignment process
-              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                "Resuming partition reassignment")
-              onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
-            }
-            else {
-              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
-                .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
-                "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
-            }
-          case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
-            .format(topicAndPartition, reassignedReplicas.mkString(",")))
-        }
-      }
+      kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
+      elect()
     }
-  }
 
-  case class IsrChangeNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+  }
 
-    def state = ControllerState.IsrChange
+  private def updateMetrics(): Unit = {
+    offlinePartitionCount =
+      if (!isActive) {
+        0
+      } else {
+        controllerContext.partitionLeadershipInfo.count { case (tp, leadershipInfo) =>
+          !controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader) &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)
+        }
+      }
 
-    override def process(): Unit = {
-      // Read the current isr change notification znodes from ZK again instead of using sequenceNumbers
-      // to increase the odds of processing recent isr changes in a single ControllerEvent
-      // and to reduce the odds of trying to access znodes that have already been deleted (KAFKA-5879).
-      val currentSequenceNumbers = zkUtils.getChildrenParentMayNotExist(ZkUtils.IsrChangeNotificationPath)
-      if (!isActive) return
-      try {
-        val topicAndPartitions = currentSequenceNumbers.flatMap(getTopicAndPartition).toSet
-        if (topicAndPartitions.nonEmpty) {
-          updateLeaderAndIsrCache(topicAndPartitions)
-          processUpdateNotifications(topicAndPartitions)
+    preferredReplicaImbalanceCount =
+      if (!isActive) {
+        0
+      } else {
+        controllerContext.partitionReplicaAssignment.count { case (topicPartition, replicas) =>
+          val preferredReplica = replicas.head
+          val leadershipInfo = controllerContext.partitionLeadershipInfo.get(topicPartition)
+          leadershipInfo.map(_.leaderAndIsr.leader != preferredReplica).getOrElse(false) &&
+            !topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic)
         }
-      } finally {
-        // delete the notifications
-        currentSequenceNumbers.map(x => controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x))
       }
-    }
 
-    private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
-      val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
-      debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
-      sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
-    }
+    globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
 
-    private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
-      val changeZnode = ZkUtils.IsrChangeNotificationPath + "/" + child
-      val (jsonOpt, _) = controllerContext.zkUtils.readDataMaybeNull(changeZnode)
-      jsonOpt.map { json =>
-        Json.parseFull(json) match {
-          case Some(js) =>
-            val isrChanges = js.asJsonObject
-            isrChanges("partitions").asJsonArray.iterator.map(_.asJsonObject).map { tpJs =>
-              val topic = tpJs("topic").to[String]
-              val partition = tpJs("partition").to[Int]
-              TopicAndPartition(topic, partition)
-            }.toSet
-          case None =>
-            error(s"Invalid topic and partition JSON in ZK. ZK notification node: $changeZnode, JSON: $json")
-            Set.empty[TopicAndPartition]
-        }
-      }.getOrElse(Set.empty[TopicAndPartition])
-    }
+    globalPartitionCount = if (!isActive) 0 else controllerContext.partitionLeadershipInfo.size
+  }
+
+  // visible for testing
+  private[controller] def handleIllegalState(e: IllegalStateException): Nothing = {
+    // Resign if the controller is in an illegal state
+    error("Forcing the controller to resign")
+    brokerRequestBatch.clear()
+    triggerControllerMove()
+    throw e
+  }
+
+  private def triggerControllerMove(): Unit = {
+    onControllerResignation()
+    activeControllerId = -1
+    kafkaControllerZkUtils.deleteController()
+  }
 
+  def expire(): Unit = {
+    val expireEvent = Expire()
+    eventManager.clearAndPut(expireEvent)
+    expireEvent.waitUntilProcessed()
   }
 
-  case class LogDirEventNotification(sequenceNumbers: Seq[String]) extends ControllerEvent {
+  def newSession(): Unit = {
+    eventManager.put(Reelect)
+  }
 
-    def state = ControllerState.LogDirChange
+  def elect(): Unit = {
+    val timestamp = time.milliseconds
+    activeControllerId = kafkaControllerZkUtils.getControllerId.getOrElse(-1)
+    /*
+     * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
+     * it's possible that the controller has already been elected when we get here. This check will prevent the following
+     * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
+     */
+    if (activeControllerId != -1) {
+      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
+      return
+    }
 
-    override def process(): Unit = {
-      val zkUtils = controllerContext.zkUtils
-      try {
-        val brokerIds = sequenceNumbers.flatMap(LogDirUtils.getBrokerIdFromLogDirEvent(zkUtils, _))
-        onBrokerLogDirFailure(brokerIds)
-      } finally {
-        // delete processed children
-        sequenceNumbers.map(x => zkUtils.deletePath(ZkUtils.LogDirEventNotificationPath + "/" + x))
-      }
+    try {
+      kafkaControllerZkUtils.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp))
+      info(config.brokerId + " successfully elected as the controller")
+      activeControllerId = config.brokerId
+      onControllerFailover()
+    } catch {
+      case _: NodeExistsException =>
+        // If someone else has written the path, then
+        activeControllerId = kafkaControllerZkUtils.getControllerId.getOrElse(-1)
+
+        if (activeControllerId != -1)
+          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
+        else
+          warn("A controller has been elected but just resigned, this will result in another round of election")
+
+      case e2: Throwable =>
+        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
+        triggerControllerMove()
     }
   }
 
-  case class PreferredReplicaLeaderElection(partitions: Set[TopicAndPartition]) extends ControllerEvent {
-
-    def state = ControllerState.ManualLeaderBalance
+  case object BrokerChange extends ControllerEvent {
+    override def state: ControllerState = ControllerState.BrokerChange
 
     override def process(): Unit = {
       if (!isActive) return
-      val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))
-      if (partitionsForTopicsToBeDeleted.nonEmpty) {
-        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
-          .format(partitionsForTopicsToBeDeleted))
-      }
-      onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
+      val curBrokers = kafkaControllerZkUtils.getAllBrokersInCluster.toSet
+      val curBrokerIds = curBrokers.map(_.id)
+      val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
+      val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
+      val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
+      val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
+      controllerContext.liveBrokers = curBrokers
+      val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
+      val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
+      val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
+      info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
+        .format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
+      newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
+      deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
+      if (newBrokerIds.nonEmpty)
+        onBrokerStartup(newBrokerIdsSorted)
+      if (deadBrokerIds.nonEmpty)
+        onBrokerFailure(deadBrokerIdsSorted)
     }
-
   }
 
-  case object AutoPreferredReplicaLeaderElection extends ControllerEvent {
+  case object TopicChange extends ControllerEvent {
+    override def state: ControllerState = ControllerState.TopicChange
 
-    def state = ControllerState.AutoLeaderBalance
+    override def process(): Unit = {
+      if (!isActive) return
+      val topics = kafkaControllerZkUtils.getAllTopicsInCluster.toSet
+      val newTopics = topics -- controllerContext.allTopics
+      val deletedTopics = controllerContext.allTopics -- topics
+      controllerContext.allTopics = topics
+
+      registerPartitionModificationsHandlers(newTopics.toSeq)
+      val addedPartitionReplicaAssignment = kafkaControllerZkUtils.getReplicaAssignmentForTopics(newTopics)
+      controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
+        !deletedTopics.contains(p._1.topic))
+      controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
+      info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
+        deletedTopics, addedPartitionReplicaAssignment))
+      if (addedPartitionReplicaAssignment.nonEmpty)
+        onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
+    }
+  }
+
+  case object LogDirEventNotification extends ControllerEvent {
+    override def state: ControllerState = ControllerState.LogDirChange
 
     override def process(): Unit = {
       if (!isActive) return
+      val sequenceNumbers = kafkaControllerZkUtils.getAllLogDirEventNotifications
       try {
-        checkAndTriggerAutoLeaderRebalance()
+        val brokerIds = kafkaControllerZkUtils.getBrokerIdsFromLogDirEvents(sequenceNumbers)
+        onBrokerLogDirFailure(brokerIds)
       } finally {
-        scheduleAutoLeaderRebalanceTask(delay = config.leaderImbalanceCheckIntervalSeconds, unit = TimeUnit.SECONDS)
+        // delete processed children
+        kafkaControllerZkUtils.deleteLogDirEventNotifications(sequenceNumbers)
       }
     }
   }
 
-  case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicAndPartition]] => Unit) extends ControllerEvent {
-
-    def state = ControllerState.ControlledShutdown
+  case class PartitionModifications(topic: String) extends ControllerEvent {
+    override def state: ControllerState = ControllerState.TopicChange
 
     override def process(): Unit = {
-      val controlledShutdownResult = Try { doControlledShutdown(id) }
-      controlledShutdownCallback(controlledShutdownResult)
-    }
-
-    private def doControlledShutdown(id: Int): Set[TopicAndPartition] = {
-      if (!isActive) {
-        throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
+      if (!isActive) return
+      val partitionReplicaAssignment = kafkaControllerZkUtils.getReplicaAssignmentForTopics(immutable.Set(topic))
+      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
+        !controllerContext.partitionReplicaAssignment.contains(p._1))
+      if(topicDeletionManager.isTopicQueuedUpForDeletion(topic))
+        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+          .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+      else {
+        if (partitionsToBeAdded.nonEmpty) {
+          info(s"New partitions to be added $partitionsToBeAdded")
+          controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
+          onNewPartitionCreation(partitionsToBeAdded.keySet)
+        }
       }
+    }
+  }
 
-      info("Shutting down broker " + id)
-
-      if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
-        throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
+  case object TopicDeletion extends ControllerEvent {
+    override def state: ControllerState = ControllerState.TopicDeletion
 
-      controllerContext.shuttingDownBrokerIds.add(id)
-      debug("All shutting down brokers: " + controllerContext.shuttingDownBrokerIds.mkString(","))
-      debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
-
-      val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
-          controllerContext.partitionsOnBroker(id)
-            .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
-
-      allPartitionsAndReplicationFactorOnBroker.foreach { case (topicAndPartition, replicationFactor) =>
-        controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
-          if (replicationFactor > 1) {
-            if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
-              // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
-              // notifies all affected brokers
-              partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                controlledShutdownPartitionLeaderSelector)
-            } else {
-              // Stop the replica first. The state change below initiates ZK changes which should take some time
-              // before which the stop replica request should be completed (in most cases)
-              try {
-                brokerRequestBatch.newBatch()
-                brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
-                  topicAndPartition.partition, deletePartition = false)
-                brokerRequestBatch.sendRequestsToBrokers(epoch)
-              } catch {
-                case e: IllegalStateException =>
-                  handleIllegalState(e)
-              }
-              // If the broker is a follower, updates the isr in ZK and notifies the current leader
-              replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
-                topicAndPartition.partition, id)), OfflineReplica)
-            }
+    override def process(): Unit = {
+      if (!isActive) return
+      var topicsToBeDeleted = kafkaControllerZkUtils.getTopicDeletions.toSet
+      debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
+      val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
+      if (nonExistentTopics.nonEmpty) {
+        warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
+        kafkaControllerZkUtils.deleteTopicDeletions(nonExistentTopics.toSeq)
+      }
+      topicsToBeDeleted --= nonExistentTopics
+      if (config.deleteTopicEnable) {
+        if (topicsToBeDeleted.nonEmpty) {
+          info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
+          // mark topic ineligible for deletion if other state changes are in progress
+          topicsToBeDeleted.foreach { topic =>
+            val partitionReassignmentInProgress =
+              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+            if (partitionReassignmentInProgress)
+              topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
           }
+          // add topic to deletion list
+          topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
         }
+      } else {
+        // If delete topic is disabled remove entries under zookeeper path : /admin/delete_topics
+        info(s"Removing $topicsToBeDeleted since delete topic is disabled")
+        kafkaControllerZkUtils.deleteTopicDeletions(topicsToBeDeleted.toSeq)
       }
-      def replicatedPartitionsBrokerLeads() = {
-        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
-        controllerContext.partitionLeadershipInfo.filter {
-          case (topicAndPartition, leaderIsrAndControllerEpoch) =>
-            leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
-        }.keys
-      }
-      replicatedPartitionsBrokerLeads().toSet
     }
   }
 
-  case class LeaderAndIsrResponseReceived(LeaderAndIsrResponseObj: AbstractResponse, brokerId: Int) extends ControllerEvent {
-
-    def state = ControllerState.LeaderAndIsrResponseReceived
+  case object PartitionReassignment extends ControllerEvent {
+    override def state: ControllerState = ControllerState.PartitionReassignment
 
     override def process(): Unit = {
-      import JavaConverters._
-      val leaderAndIsrResponse = LeaderAndIsrResponseObj.asInstanceOf[LeaderAndIsrResponse]
-
-      if (leaderAndIsrResponse.error != Errors.NONE) {
-        stateChangeLogger.error(s"Received error in LeaderAndIsr response $leaderAndIsrResponse from broker $brokerId")
-        return
+      if (!isActive) return
+      kafkaControllerZkUtils.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)
+      val partitionReassignment = kafkaControllerZkUtils.getPartitionReassignment
+      val partitionsToBeReassigned = partitionReassignment.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+      partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+        if(topicDeletionManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
+          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
+            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
+          removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+        } else {
+          val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
+          initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+        }
       }
+    }
+  }
 
-      val offlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.KAFKA_STORAGE_ERROR).keys.map(
-        new TopicAndPartition(_)).toSet
-      val onlineReplicas = leaderAndIsrResponse.responses().asScala.filter(_._2 == Errors.NONE).keys.map(
-        new TopicAndPartition(_)).toSet
-      val previousOfflineReplicas = controllerContext.replicasOnOfflineDirs.getOrElse(brokerId, Set.empty[TopicAndPartition])
-      val currentOfflineReplicas = previousOfflineReplicas -- onlineReplicas ++ offlineReplicas
-      controllerContext.replicasOnOfflineDirs.put(brokerId, currentOfflineReplicas)
-      val newOfflineReplicas = currentOfflineReplicas -- previousOfflineReplicas
+  case class PartitionReassignmentIsrChange(partition: TopicAndPartition) extends ControllerEvent {
+    override def state: ControllerState = ControllerState.PartitionReassignment
 
-      if (newOfflineReplicas.nonEmpty) {
-        stateChangeLogger.info(s"Mark replicas ${newOfflineReplicas.mkString(",")} on broker $brokerId as offline")
-        onReplicasBecomeOffline(newOfflineReplicas.map(tp => PartitionAndReplica(tp.topic, tp.partition, brokerId)))
+    override def process(): Unit = {
+      if (!isActive) return
+      // check if this partition is still being reassigned or not
+      controllerContext.partitionsBeingReassigned.get(partition).foreach { reassignedPartitionContext =>
+        val reassignedReplicas = reassignedPartitionContext.newReplicas.toSet
+        kafkaControllerZkUtils.getTopicPartitionStates(Seq(partition)).get(partition) match {
+          case Some(leaderIsrAndControllerEpoch) => // check if new replicas have joined ISR
+            val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+            val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
+            if(caughtUpReplicas == reassignedReplicas) {
+              // resume the partition reassignment process
+              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
+                "Resuming partition reassignment")
+              onPartitionReassignment(partition, reassignedPartitionContext)
+            }
+            else {
+              info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+                .format(caughtUpReplicas.size, reassignedReplicas.size, partition) +
+                "Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
+            }
+          case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+            .format(partit

<TRUNCATED>

[3/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
new file mode 100644
index 0000000..cfe6c6d
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/KafkaControllerZkUtils.scala
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import java.util.Properties
+
+import kafka.api.LeaderAndIsr
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.log.LogConfig
+import kafka.server.ConfigType
+import kafka.utils.{Logging, ZkUtils}
+import org.apache.zookeeper.KeeperException.Code
+import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.{CreateMode, KeeperException}
+
+import scala.collection.mutable
+
+class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean) extends Logging {
+  /**
+   * Gets topic partition states for the given partitions.
+   * @param partitions the partitions for which we want ot get states.
+   * @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
+   */
+  def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
+    val getDataRequests = partitions.map { partition =>
+      GetDataRequest(TopicPartitionStateZNode.path(partition), partition)
+    }
+    retryUntilConnected(getDataRequests).map(_.asInstanceOf[GetDataResponse])
+  }
+
+  /**
+   * Sets topic partition states for the given partitions.
+   * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+   * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
+   */
+  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+    val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val path = TopicPartitionStateZNode.path(partition)
+      val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
+      SetDataRequest(path, data, leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion, partition)
+    }
+    retryUntilConnected(setDataRequests.toSeq).map(_.asInstanceOf[SetDataResponse])
+  }
+
+  /**
+   * Creates topic partition state znodes for the given partitions.
+   * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
+   * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
+   */
+  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
+    createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
+    createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
+    val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val path = TopicPartitionStateZNode.path(partition)
+      val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
+      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, partition)
+    }
+    retryUntilConnected(createRequests.toSeq).map(_.asInstanceOf[CreateResponse])
+  }
+
+  /**
+   * Sets the controller epoch conditioned on the given epochZkVersion.
+   * @param epoch the epoch to set
+   * @param epochZkVersion the expected version number of the epoch znode.
+   * @return SetDataResponse
+   */
+  def setControllerEpochRaw(epoch: Int, epochZkVersion: Int): SetDataResponse = {
+    val setDataRequest = SetDataRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), epochZkVersion, null)
+    retryUntilConnected(setDataRequest).asInstanceOf[SetDataResponse]
+  }
+
+  /**
+   * Creates the controller epoch znode.
+   * @param epoch the epoch to set
+   * @return CreateResponse
+   */
+  def createControllerEpochRaw(epoch: Int): CreateResponse = {
+    val createRequest = CreateRequest(ControllerEpochZNode.path, ControllerEpochZNode.encode(epoch), acls(ControllerEpochZNode.path), CreateMode.PERSISTENT, null)
+    retryUntilConnected(createRequest).asInstanceOf[CreateResponse]
+  }
+
+  /**
+   * Try to update the partition states of multiple partitions in zookeeper.
+   * @param leaderAndIsrs The partition states to update.
+   * @param controllerEpoch The current controller epoch.
+   * @return A tuple of three values:
+   *         1. The successfully updated partition states with adjusted znode versions.
+   *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
+   *         the partition leader updated partition state while the controller attempted to update partition state.
+   *         3. Exceptions corresponding to failed partition state updates.
+   */
+  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int):
+  (Map[TopicAndPartition, LeaderAndIsr],
+    Seq[TopicAndPartition],
+    Map[TopicAndPartition, Exception]) = {
+    val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
+    val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
+    val failed = mutable.Map.empty[TopicAndPartition, Exception]
+    val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
+    val setDataResponses = try {
+      setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+    } catch {
+      case e: Exception =>
+        leaderAndIsrs.keys.foreach(partition => failed.put(partition, e))
+        return (successfulUpdates.toMap, updatesToRetry, failed.toMap)
+    }
+    setDataResponses.foreach { setDataResponse =>
+      val partition = setDataResponse.ctx.asInstanceOf[TopicAndPartition]
+      if (Code.get(setDataResponse.rc) == Code.OK) {
+        val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
+        successfulUpdates.put(partition, updatedLeaderAndIsr)
+      } else if (Code.get(setDataResponse.rc) == Code.BADVERSION) {
+        updatesToRetry += partition
+      } else {
+        failed.put(partition, KeeperException.create(Code.get(setDataResponse.rc)))
+      }
+    }
+    (successfulUpdates.toMap, updatesToRetry, failed.toMap)
+  }
+
+  /**
+   * Get log configs that merge local configs with topic-level configs in zookeeper.
+   * @param topics The topics to get log configs for.
+   * @param config The local configs.
+   * @return A tuple of two values:
+   *         1. The successfully gathered log configs
+   *         2. Exceptions corresponding to failed log config lookups.
+   */
+  def getLogConfigs(topics: Seq[String], config: java.util.Map[String, AnyRef]):
+  (Map[String, LogConfig], Map[String, Exception]) = {
+    val logConfigs = mutable.Map.empty[String, LogConfig]
+    val failed = mutable.Map.empty[String, Exception]
+    val configResponses = try {
+      getTopicConfigs(topics)
+    } catch {
+      case e: Exception =>
+        topics.foreach(topic => failed.put(topic, e))
+        return (logConfigs.toMap, failed.toMap)
+    }
+    configResponses.foreach { configResponse =>
+      val topic = configResponse.ctx.asInstanceOf[String]
+      if (Code.get(configResponse.rc) == Code.OK) {
+        val overrides = ConfigEntityZNode.decode(configResponse.data)
+        val logConfig = LogConfig.fromProps(config, overrides.getOrElse(new Properties))
+        logConfigs.put(topic, logConfig)
+      } else if (Code.get(configResponse.rc) == Code.NONODE) {
+        val logConfig = LogConfig.fromProps(config, new Properties)
+        logConfigs.put(topic, logConfig)
+      } else {
+        failed.put(topic, KeeperException.create(Code.get(configResponse.rc)))
+      }
+    }
+    (logConfigs.toMap, failed.toMap)
+  }
+
+  /**
+   * Gets all brokers in the cluster.
+   * @return sequence of brokers in the cluster.
+   */
+  def getAllBrokersInCluster: Seq[Broker] = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(BrokerIdsZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      val brokerIds = getChildrenResponse.children.map(_.toInt)
+      val getDataRequests = brokerIds.map(brokerId => GetDataRequest(BrokerIdZNode.path(brokerId), brokerId))
+      val getDataResponses = retryUntilConnected(getDataRequests).map(_.asInstanceOf[GetDataResponse])
+      getDataResponses.flatMap { getDataResponse =>
+        val brokerId = getDataResponse.ctx.asInstanceOf[Int]
+        if (Code.get(getDataResponse.rc) == Code.OK) {
+          Option(BrokerIdZNode.decode(brokerId, getDataResponse.data))
+        } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+          None
+        } else {
+          throw KeeperException.create(Code.get(getDataResponse.rc))
+        }
+      }
+    } else if (Code.get(getChildrenResponse.rc) == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Gets all topics in the cluster.
+   * @return sequence of topics in the cluster.
+   */
+  def getAllTopicsInCluster: Seq[String] = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(TopicsZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      getChildrenResponse.children
+    } else if (Code.get(getChildrenResponse.rc) == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Sets the topic znode with the given assignment.
+   * @param topic the topic whose assignment is being set.
+   * @param assignment the partition to replica mapping to set for the given topic
+   * @return SetDataResponse
+   */
+  def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+    val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), -1, null)
+    retryUntilConnected(setDataRequest).asInstanceOf[SetDataResponse]
+  }
+
+  /**
+   * Gets the log dir event notifications as strings. These strings are the znode names and not the absolute znode path.
+   * @return sequence of znode names and not the absolute znode path.
+   */
+  def getAllLogDirEventNotifications: Seq[String] = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)
+    } else if (Code.get(getChildrenResponse.rc) == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Reads each of the log dir event notifications associated with the given sequence numbers and extracts the broker ids.
+   * @param sequenceNumbers the sequence numbers associated with the log dir event notifications.
+   * @return broker ids associated with the given log dir event notifications.
+   */
+  def getBrokerIdsFromLogDirEvents(sequenceNumbers: Seq[String]): Seq[Int] = {
+    val getDataRequests = sequenceNumbers.map(sequenceNumber => GetDataRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), null))
+    val getDataResponses = retryUntilConnected(getDataRequests).map(_.asInstanceOf[GetDataResponse])
+    getDataResponses.flatMap { getDataResponse =>
+      if (Code.get(getDataResponse.rc) == Code.OK) {
+        LogDirEventNotificationSequenceZNode.decode(getDataResponse.data)
+      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+        None
+      } else {
+        throw KeeperException.create(Code.get(getDataResponse.rc))
+      }
+    }
+  }
+
+  /**
+   * Deletes all log dir event notifications.
+   */
+  def deleteLogDirEventNotifications(): Unit = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      deleteLogDirEventNotifications(getChildrenResponse.children)
+    } else if (Code.get(getChildrenResponse.rc) != Code.NONODE) {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Deletes the log dir event notifications associated with the given sequence numbers.
+   * @param sequenceNumbers the sequence numbers associated with the log dir event notifications to be deleted.
+   */
+  def deleteLogDirEventNotifications(sequenceNumbers: Seq[String]): Unit = {
+    val deleteRequests = sequenceNumbers.map(sequenceNumber => DeleteRequest(LogDirEventNotificationSequenceZNode.path(sequenceNumber), -1, null))
+    retryUntilConnected(deleteRequests)
+  }
+
+  /**
+   * Gets the assignments for the given topics.
+   * @param topics the topics whose partitions we wish to get the assignments for.
+   * @return the replica assignment for each partition from the given topics.
+   */
+  def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
+    val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), topic))
+    val getDataResponses = retryUntilConnected(getDataRequests.toSeq).map(_.asInstanceOf[GetDataResponse])
+    getDataResponses.flatMap { getDataResponse =>
+      val topic = getDataResponse.ctx.asInstanceOf[String]
+      if (Code.get(getDataResponse.rc) == Code.OK) {
+        TopicZNode.decode(topic, getDataResponse.data)
+      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+        Map.empty[TopicAndPartition, Seq[Int]]
+      } else {
+        throw KeeperException.create(Code.get(getDataResponse.rc))
+      }
+    }.toMap
+  }
+
+  /**
+   * Get all topics marked for deletion.
+   * @return sequence of topics marked for deletion.
+   */
+  def getTopicDeletions: Seq[String] = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(DeleteTopicsZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      getChildrenResponse.children
+    } else if (Code.get(getChildrenResponse.rc) == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Remove the given topics from the topics marked for deletion.
+   * @param topics the topics to remove.
+   */
+  def deleteTopicDeletions(topics: Seq[String]): Unit = {
+    val deleteRequests = topics.map(topic => DeleteRequest(DeleteTopicsTopicZNode.path(topic), -1, null))
+    retryUntilConnected(deleteRequests)
+  }
+
+  /**
+   * Returns all reassignments.
+   * @return the reassignments for each partition.
+   */
+  def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
+    val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path, null)
+    val getDataResponse = retryUntilConnected(getDataRequest).asInstanceOf[GetDataResponse]
+    if (Code.get(getDataResponse.rc) == Code.OK) {
+      ReassignPartitionsZNode.decode(getDataResponse.data)
+    } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+      Map.empty[TopicAndPartition, Seq[Int]]
+    } else {
+      throw KeeperException.create(Code.get(getDataResponse.rc))
+    }
+  }
+
+  /**
+   * Sets the partition reassignment znode with the given reassignment.
+   * @param reassignment the reassignment to set on the reassignment znode.
+   * @return SetDataResponse
+   */
+  def setPartitionReassignmentRaw(reassignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+    val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), -1, null)
+    retryUntilConnected(setDataRequest).asInstanceOf[SetDataResponse]
+  }
+
+  /**
+   * Creates the partition reassignment znode with the given reassignment.
+   * @param reassignment the reassignment to set on the reassignment znode.
+   * @return CreateResponse
+   */
+  def createPartitionReassignment(reassignment: Map[TopicAndPartition, Seq[Int]]): CreateResponse = {
+    val createRequest = CreateRequest(ReassignPartitionsZNode.path, ReassignPartitionsZNode.encode(reassignment), acls(ReassignPartitionsZNode.path), CreateMode.PERSISTENT, null)
+    retryUntilConnected(createRequest).asInstanceOf[CreateResponse]
+  }
+
+  /**
+   * Deletes the partition reassignment znode.
+   */
+  def deletePartitionReassignment(): Unit = {
+    val deleteRequest = DeleteRequest(ReassignPartitionsZNode.path, -1, null)
+    retryUntilConnected(deleteRequest)
+  }
+
+  /**
+   * Gets topic partition states for the given partitions.
+   * @param partitions the partitions for which we want ot get states.
+   * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
+   */
+  def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+    val getDataResponses = getTopicPartitionStatesRaw(partitions)
+    getDataResponses.flatMap { getDataResponse =>
+      val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition]
+      if (Code.get(getDataResponse.rc) == Code.OK) {
+        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
+      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+        None
+      } else {
+        throw KeeperException.create(Code.get(getDataResponse.rc))
+      }
+    }.toMap
+  }
+
+  /**
+   * Gets the isr change notifications as strings. These strings are the znode names and not the absolute znode path.
+   * @return sequence of znode names and not the absolute znode path.
+   */
+  def getAllIsrChangeNotifications: Seq[String] = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      getChildrenResponse.children.map(IsrChangeNotificationSequenceZNode.sequenceNumber)
+    } else if (Code.get(getChildrenResponse.rc) == Code.NONODE) {
+      Seq.empty
+    } else {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Reads each of the isr change notifications associated with the given sequence numbers and extracts the partitions.
+   * @param sequenceNumbers the sequence numbers associated with the isr change notifications.
+   * @return partitions associated with the given isr change notifications.
+   */
+  def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
+    val getDataRequests = sequenceNumbers.map(sequenceNumber => GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), null))
+    val getDataResponses = retryUntilConnected(getDataRequests).map(_.asInstanceOf[GetDataResponse])
+    getDataResponses.flatMap { getDataResponse =>
+      if (Code.get(getDataResponse.rc) == Code.OK) {
+        IsrChangeNotificationSequenceZNode.decode(getDataResponse.data)
+      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+        None
+      } else {
+        throw KeeperException.create(Code.get(getDataResponse.rc))
+      }
+    }
+  }
+
+  /**
+   * Deletes all isr change notifications.
+   */
+  def deleteIsrChangeNotifications(): Unit = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(IsrChangeNotificationZNode.path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      deleteIsrChangeNotifications(getChildrenResponse.children)
+    } else if (Code.get(getChildrenResponse.rc) != Code.NONODE) {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+
+  /**
+   * Deletes the isr change notifications associated with the given sequence numbers.
+   * @param sequenceNumbers the sequence numbers associated with the isr change notifications to be deleted.
+   */
+  def deleteIsrChangeNotifications(sequenceNumbers: Seq[String]): Unit = {
+    val deleteRequests = sequenceNumbers.map(sequenceNumber => DeleteRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber), -1, null))
+    retryUntilConnected(deleteRequests)
+  }
+
+  /**
+   * Gets the partitions marked for preferred replica election.
+   * @return sequence of partitions.
+   */
+  def getPreferredReplicaElection: Set[TopicAndPartition] = {
+    val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path, null)
+    val getDataResponse = retryUntilConnected(getDataRequest).asInstanceOf[GetDataResponse]
+    if (Code.get(getDataResponse.rc) == Code.OK) {
+      PreferredReplicaElectionZNode.decode(getDataResponse.data)
+    } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+      Set.empty[TopicAndPartition]
+    } else {
+      throw KeeperException.create(Code.get(getDataResponse.rc))
+    }
+  }
+
+  /**
+   * Deletes the preferred replica election znode.
+   */
+  def deletePreferredReplicaElection(): Unit = {
+    val deleteRequest = DeleteRequest(PreferredReplicaElectionZNode.path, -1, null)
+    retryUntilConnected(deleteRequest)
+  }
+
+  /**
+   * Gets the controller id.
+   * @return optional integer that is Some if the controller znode exists and can be parsed and None otherwise.
+   */
+  def getControllerId: Option[Int] = {
+    val getDataRequest = GetDataRequest(ControllerZNode.path, null)
+    val getDataResponse = retryUntilConnected(getDataRequest).asInstanceOf[GetDataResponse]
+    if (Code.get(getDataResponse.rc) == Code.OK) {
+      ControllerZNode.decode(getDataResponse.data)
+    } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+      None
+    } else {
+      throw KeeperException.create(Code.get(getDataResponse.rc))
+    }
+  }
+
+  /**
+   * Deletes the controller znode.
+   */
+  def deleteController(): Unit = {
+    val deleteRequest = DeleteRequest(ControllerZNode.path, -1, null)
+    retryUntilConnected(deleteRequest)
+  }
+
+  /**
+   * Gets the controller epoch.
+   * @return optional (Int, Stat) that is Some if the controller epoch path exists and None otherwise.
+   */
+  def getControllerEpoch: Option[(Int, Stat)] = {
+    val getDataRequest = GetDataRequest(ControllerEpochZNode.path, null)
+    val getDataResponse = retryUntilConnected(getDataRequest).asInstanceOf[GetDataResponse]
+    if (Code.get(getDataResponse.rc) == Code.OK) {
+      val epoch = ControllerEpochZNode.decode(getDataResponse.data)
+      Option(epoch, getDataResponse.stat)
+    } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+      None
+    } else {
+      throw KeeperException.create(Code.get(getDataResponse.rc))
+    }
+  }
+
+  /**
+   * Recursively deletes the topic znode.
+   * @param topic the topic whose topic znode we wish to delete.
+   */
+  def deleteTopicZNode(topic: String): Unit = {
+    deleteRecursive(TopicZNode.path(topic))
+  }
+
+  /**
+   * Deletes the topic configs for the given topics.
+   * @param topics the topics whose configs we wish to delete.
+   */
+  def deleteTopicConfigs(topics: Seq[String]): Unit = {
+    val deleteRequests = topics.map(topic => DeleteRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), -1, null))
+    retryUntilConnected(deleteRequests)
+  }
+
+  /**
+   * This registers a ZNodeChangeHandler and attempts to register a watcher with an ExistsRequest, which allows data watcher
+   * registrations on paths which might not even exist.
+   *
+   * @param zNodeChangeHandler
+   */
+  def registerZNodeChangeHandlerAndCheckExistence(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+    val existsResponse = retryUntilConnected(ExistsRequest(zNodeChangeHandler.path, null)).asInstanceOf[ExistsResponse]
+    if (Code.get(existsResponse.rc) != Code.OK && Code.get(existsResponse.rc) != Code.NONODE) {
+      throw KeeperException.create(Code.get(existsResponse.rc))
+    }
+  }
+
+  /**
+   * See ZookeeperClient.registerZNodeChangeHandler
+   * @param zNodeChangeHandler
+   */
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+    zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler)
+  }
+
+  /**
+   * See ZookeeperClient.unregisterZNodeChangeHandler
+   * @param path
+   */
+  def unregisterZNodeChangeHandler(path: String): Unit = {
+    zookeeperClient.unregisterZNodeChangeHandler(path)
+  }
+
+  /**
+   * See ZookeeperClient.registerZNodeChildChangeHandler
+   * @param zNodeChildChangeHandler
+   */
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
+    zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler)
+  }
+
+  /**
+   * See ZookeeperClient.unregisterZNodeChildChangeHandler
+   * @param path
+   */
+  def unregisterZNodeChildChangeHandler(path: String): Unit = {
+    zookeeperClient.unregisterZNodeChildChangeHandler(path)
+  }
+
+  /**
+   * Close the underlying ZookeeperClient.
+   */
+  def close(): Unit = {
+    zookeeperClient.close()
+  }
+
+  private def deleteRecursive(path: String): Unit = {
+    val getChildrenResponse = retryUntilConnected(GetChildrenRequest(path, null)).asInstanceOf[GetChildrenResponse]
+    if (Code.get(getChildrenResponse.rc) == Code.OK) {
+      getChildrenResponse.children.foreach(child => deleteRecursive(s"$path/$child"))
+      val deleteResponse = retryUntilConnected(DeleteRequest(path, -1, null))
+      if (Code.get(deleteResponse.rc) != Code.OK && Code.get(deleteResponse.rc) != Code.NONODE) {
+        throw KeeperException.create(Code.get(deleteResponse.rc))
+      }
+    } else if (Code.get(getChildrenResponse.rc) != Code.NONODE) {
+      throw KeeperException.create(Code.get(getChildrenResponse.rc))
+    }
+  }
+  private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
+    val createRequests = partitions.map { partition =>
+      val path = TopicPartitionZNode.path(partition)
+      val data = TopicPartitionZNode.encode
+      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, partition)
+    }
+    retryUntilConnected(createRequests).map(_.asInstanceOf[CreateResponse])
+  }
+
+  private def createTopicPartitions(topics: Seq[String]) = {
+    val createRequests = topics.map { topic =>
+      val path = TopicPartitionsZNode.path(topic)
+      val data = TopicPartitionsZNode.encode
+      CreateRequest(path, data, acls(path), CreateMode.PERSISTENT, topic)
+    }
+    retryUntilConnected(createRequests).map(_.asInstanceOf[CreateResponse])
+  }
+
+  private def getTopicConfigs(topics: Seq[String]) = {
+    val getDataRequests = topics.map { topic =>
+      GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), topic)
+    }
+    retryUntilConnected(getDataRequests).map(_.asInstanceOf[GetDataResponse])
+  }
+
+  private def acls(path: String) = {
+    import scala.collection.JavaConverters._
+    ZkUtils.defaultAcls(isSecure, path).asScala
+  }
+
+  private def retryUntilConnected(request: AsyncRequest): AsyncResponse = {
+    retryUntilConnected(Seq(request)).head
+  }
+
+  private def retryUntilConnected(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = {
+    var remaining = requests
+    var responses: Seq[AsyncResponse] = Seq.empty[AsyncResponse]
+    while (remaining.nonEmpty) {
+      responses = zookeeperClient.handle(remaining)
+      val requestResponsePairs = remaining.zip(responses)
+      val (passed, connectionLoss) = requestResponsePairs.partition { case (_, response) =>
+        Code.get(response.rc) != Code.CONNECTIONLOSS
+      }
+      responses ++= passed.map { case (_, response) => response }
+      remaining = connectionLoss.map { case (request, _) => request }
+      if (remaining.nonEmpty) zookeeperClient.waitUntilConnected()
+    }
+    responses
+  }
+
+  def checkedEphemeralCreate(path: String, data: Array[Byte]): Unit = {
+    val checkedEphemeral = new CheckedEphemeral(path, data)
+    info(s"Creating $path (is it secure? $isSecure)")
+    val code = checkedEphemeral.create()
+    info(s"Result of znode creation at $path is: $code")
+    code match {
+      case Code.OK =>
+      case _ => throw KeeperException.create(code)
+    }
+  }
+
+  private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    def create(): Code = {
+      val createRequest = CreateRequest(path, data, acls(path), CreateMode.EPHEMERAL, null)
+      val createResponse = retryUntilConnected(createRequest)
+      val code = Code.get(createResponse.rc)
+      if (code == Code.OK) {
+        code
+      } else if (code == Code.NODEEXISTS) {
+        get()
+      } else {
+        error(s"Error while creating ephemeral at $path with return code: $code")
+        code
+      }
+    }
+
+    private def get(): Code = {
+      val getDataRequest = GetDataRequest(path, null)
+      val getDataResponse = retryUntilConnected(getDataRequest).asInstanceOf[GetDataResponse]
+      val code = Code.get(getDataResponse.rc)
+      if (code == Code.OK) {
+        if (getDataResponse.stat.getEphemeralOwner != zookeeperClient.sessionId) {
+          error(s"Error while creating ephemeral at $path with return code: $code")
+          Code.NODEEXISTS
+        } else {
+          code
+        }
+      } else if (code == Code.NONODE) {
+        info(s"The ephemeral node at $path went away while reading it")
+        create()
+      } else {
+        error(s"Error while creating ephemeral at $path with return code: $code")
+        code
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
deleted file mode 100644
index e534ff3..0000000
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.controller
-
-import kafka.admin.AdminUtils
-import kafka.api.LeaderAndIsr
-import kafka.common.{LeaderElectionNotNeededException, NoReplicaOnlineException, StateChangeFailedException, TopicAndPartition}
-import kafka.log.LogConfig
-import kafka.server.{ConfigType, KafkaConfig}
-import kafka.utils.Logging
-
-trait PartitionLeaderSelector {
-
-  /**
-   * @param topicAndPartition          The topic and partition whose leader needs to be elected
-   * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
-   * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
-   * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
-   * the LeaderAndIsrRequest.
-   */
-  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
-
-}
-
-/**
- * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
- * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
- *    isr as the new isr.
- * 2. Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
- * 3. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
- * 4. If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
- * Replicas to receive LeaderAndIsr request = live assigned replicas
- * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
- */
-class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, config: KafkaConfig)
-  extends PartitionLeaderSelector with Logging {
-
-  logIdent = "[OfflinePartitionLeaderSelector]: "
-
-  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
-      case Some(assignedReplicas) =>
-        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
-        val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
-        val newLeaderAndIsr =
-          if (liveBrokersInIsr.isEmpty) {
-            // Prior to electing an unclean (i.e. non-ISR) leader, ensure that doing so is not disallowed by the configuration
-            // for unclean leader election.
-            if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(controllerContext.zkUtils,
-              ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
-              throw new NoReplicaOnlineException(
-                s"No replica in ISR for partition $topicAndPartition is alive. Live brokers are: [${controllerContext.liveBrokerIds}], " +
-                  s"ISR brokers are: [${currentLeaderAndIsr.isr.mkString(",")}]"
-              )
-            }
-            debug(s"No broker in ISR is alive for $topicAndPartition. Pick the leader from the alive assigned " +
-              s"replicas: ${liveAssignedReplicas.mkString(",")}")
-
-            if (liveAssignedReplicas.isEmpty) {
-              throw new NoReplicaOnlineException(s"No replica for partition $topicAndPartition is alive. Live " +
-                s"brokers are: [${controllerContext.liveBrokerIds}]. Assigned replicas are: [$assignedReplicas].")
-            } else {
-              controllerContext.stats.uncleanLeaderElectionRate.mark()
-              val newLeader = liveAssignedReplicas.head
-              warn(s"No broker in ISR is alive for $topicAndPartition. Elect leader $newLeader from live " +
-                s"brokers ${liveAssignedReplicas.mkString(",")}. There's potential data loss.")
-              currentLeaderAndIsr.newLeaderAndIsr(newLeader, List(newLeader))
-            }
-          } else {
-            val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
-            val newLeader = liveReplicasInIsr.head
-            debug(s"Some broker in ISR is alive for $topicAndPartition. Select $newLeader from ISR " +
-              s"${liveBrokersInIsr.mkString(",")} to be the leader.")
-            currentLeaderAndIsr.newLeaderAndIsr(newLeader, liveBrokersInIsr)
-          }
-        info(s"Selected new leader and ISR $newLeaderAndIsr for offline partition $topicAndPartition")
-        (newLeaderAndIsr, liveAssignedReplicas)
-      case None =>
-        throw new NoReplicaOnlineException(s"Partition $topicAndPartition doesn't have replicas assigned to it")
-    }
-  }
-}
-
-/**
- * New leader = a live in-sync reassigned replica
- * New isr = current isr
- * Replicas to receive LeaderAndIsr request = reassigned replicas
- */
-class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
-
-  logIdent = "[ReassignedPartitionLeaderSelector]: "
-
-  /**
-   * The reassigned replicas are already in the ISR when selectLeader is called.
-   */
-  def selectLeader(topicAndPartition: TopicAndPartition,
-                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
-    val newLeaderOpt = reassignedInSyncReplicas.find { r =>
-      controllerContext.isReplicaOnline(r, topicAndPartition) && currentLeaderAndIsr.isr.contains(r)
-    }
-    newLeaderOpt match {
-      case Some(newLeader) => (currentLeaderAndIsr.newLeader(newLeader), reassignedInSyncReplicas)
-      case None =>
-        val errorMessage = if (reassignedInSyncReplicas.isEmpty) {
-          s"List of reassigned replicas for partition $topicAndPartition is empty. Current leader and ISR: " +
-            s"[$currentLeaderAndIsr]"
-        } else {
-          s"None of the reassigned replicas for partition $topicAndPartition are in-sync with the leader. " +
-            s"Current leader and ISR: [$currentLeaderAndIsr]"
-        }
-        throw new NoReplicaOnlineException(errorMessage)
-    }
-  }
-}
-
-/**
- * New leader = preferred (first assigned) replica (if in isr and alive);
- * New isr = current isr;
- * Replicas to receive LeaderAndIsr request = assigned replicas
- */
-class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
-
-  logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
-
-  def selectLeader(topicAndPartition: TopicAndPartition,
-                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-    val preferredReplica = assignedReplicas.head
-    // check if preferred replica is the current leader
-    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-    if (currentLeader == preferredReplica) {
-      throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s"
-                                                   .format(preferredReplica, topicAndPartition))
-    } else {
-      info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
-        " Triggering preferred replica leader election")
-      // check if preferred replica is not the current leader and is alive and in the isr
-      if (controllerContext.isReplicaOnline(preferredReplica, topicAndPartition) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
-        val newLeaderAndIsr = currentLeaderAndIsr.newLeader(preferredReplica)
-        (newLeaderAndIsr, assignedReplicas)
-      } else {
-        throw new StateChangeFailedException(s"Preferred replica $preferredReplica for partition $topicAndPartition " +
-          s"is either not alive or not in the isr. Current leader and ISR: [$currentLeaderAndIsr]")
-      }
-    }
-  }
-}
-
-/**
- * New leader = replica in isr that's not being shutdown;
- * New isr = current isr - shutdown replica;
- * Replicas to receive LeaderAndIsr request = live assigned replicas
- */
-class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
-
-  logIdent = "[ControlledShutdownLeaderSelector]: "
-
-  def selectLeader(topicAndPartition: TopicAndPartition,
-                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val currentIsr = currentLeaderAndIsr.isr
-    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-    val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition, true))
-
-    val newIsr = currentIsr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
-    liveAssignedReplicas.find(newIsr.contains) match {
-      case Some(newLeader) =>
-        debug(s"Partition $topicAndPartition : current leader = ${currentLeaderAndIsr.leader}, new leader = $newLeader")
-        val newLeaderAndIsr = currentLeaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
-        (newLeaderAndIsr, liveAssignedReplicas)
-      case None =>
-        throw new StateChangeFailedException(s"No other replicas in ISR ${currentIsr.mkString(",")} for $topicAndPartition " +
-          s"besides shutting down brokers ${controllerContext.shuttingDownBrokerIds.mkString(",")}")
-    }
-  }
-}
-
-/**
- * Essentially does nothing. Returns the current leader and ISR, and the current
- * set of replicas assigned to a given topic/partition.
- */
-class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
-
-  logIdent = "[NoOpLeaderSelector]: "
-
-  def selectLeader(topicAndPartition: TopicAndPartition,
-                   currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
-    (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 9e75bc0..16e1486 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -17,13 +17,14 @@
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
-import kafka.common.{LeaderElectionNotNeededException, NoReplicaOnlineException, StateChangeFailedException, TopicAndPartition}
-import kafka.controller.Callbacks.CallbackBuilder
-import kafka.utils.ZkUtils._
-import kafka.utils.{Logging, ReplicationUtils}
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.common.{StateChangeFailedException, TopicAndPartition}
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.Code
+
+import scala.collection.mutable
 
-import scala.collection._
 
 /**
  * This class represents the state machine for partitions. It defines the states that a partition can be in, and
@@ -37,26 +38,25 @@ import scala.collection._
  * 4. OfflinePartition    : If, after successful leader election, the leader for partition dies, then the partition
  *                          moves to the OfflinePartition state. Valid previous states are NewPartition/OnlinePartition
  */
-class PartitionStateMachine(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends Logging {
-
-  private val controllerContext = controller.controllerContext
-  private val controllerId = controller.config.brokerId
-  private val zkUtils = controllerContext.zkUtils
-  private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller, stateChangeLogger)
-  private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
+class PartitionStateMachine(config: KafkaConfig,
+                            stateChangeLogger: StateChangeLogger,
+                            controllerContext: ControllerContext,
+                            topicDeletionManager: TopicDeletionManager,
+                            zkUtils: KafkaControllerZkUtils,
+                            partitionState: mutable.Map[TopicAndPartition, PartitionState],
+                            controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
+  private val controllerId = config.brokerId
 
   this.logIdent = s"[PartitionStateMachine controllerId=$controllerId] "
 
   /**
-   * Invoked on successful controller election. First registers a topic change listener since that triggers all
-   * state transitions for partitions. Initializes the state of partitions by reading from zookeeper. Then triggers
-   * the OnlinePartition state change for all new or offline partitions.
+   * Invoked on successful controller election.
    */
   def startup() {
+    info("Initializing partition state")
     initializePartitionState()
+    info("Triggering online partition state changes")
     triggerOnlinePartitionStateChange()
-
     info(s"Started partition state machine with initial state -> $partitionState")
   }
 
@@ -65,57 +65,63 @@ class PartitionStateMachine(controller: KafkaController, stateChangeLogger: Stat
    */
   def shutdown() {
     partitionState.clear()
-
     info("Stopped partition state machine")
   }
 
   /**
-   * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
-   * state. This is called on a successful controller election and on broker changes
+   * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
+   * zookeeper
    */
-  def triggerOnlinePartitionStateChange() {
-    try {
-      brokerRequestBatch.newBatch()
-      // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
-      // that belong to topics to be deleted
-      for ((topicAndPartition, partitionState) <- partitionState
-          if !controller.topicDeletionManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
-        if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
-          handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
-                            (new CallbackBuilder).build)
+  private def initializePartitionState() {
+    for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
+      // check if leader and isr path exists for partition. If not, then it is in NEW state
+      controllerContext.partitionLeadershipInfo.get(topicPartition) match {
+        case Some(currentLeaderIsrAndEpoch) =>
+          // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
+          if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
+          // leader is alive
+            partitionState.put(topicPartition, OnlinePartition)
+          else
+            partitionState.put(topicPartition, OfflinePartition)
+        case None =>
+          partitionState.put(topicPartition, NewPartition)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-    } catch {
-      case e: Throwable => error("Error while moving some partitions to the online state", e)
-      // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
     }
   }
 
-  def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
-    partitionState.filter(p => p._2 == state).keySet
-  }
-
   /**
-   * This API is invoked by the partition change zookeeper listener
-   * @param partitions   The list of partitions that need to be transitioned to the target state
-   * @param targetState  The state that the partitions should be moved to
+   * This API invokes the OnlinePartition state change on all partitions in either the NewPartition or OfflinePartition
+   * state. This is called on a successful controller election and on broker changes
    */
-  def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
-                         leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
-                         callbacks: Callbacks = (new CallbackBuilder).build) {
-    info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
-    try {
-      brokerRequestBatch.newBatch()
-      partitions.foreach { topicAndPartition =>
-        handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
+  def triggerOnlinePartitionStateChange() {
+    // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
+    // that belong to topics to be deleted
+    val partitionsToTrigger = partitionState.filter { case (partition, partitionState) =>
+      !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic) &&
+        (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
+    }.keys.toSeq
+    handleStateChanges(partitionsToTrigger, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
+    // TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error.
+    // It is important to trigger leader election for those partitions.
+  }
+
+  def handleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState,
+                         partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
+    if (partitions.nonEmpty) {
+      try {
+        controllerBrokerRequestBatch.newBatch()
+        doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt)
+        controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
+      } catch {
+        case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
       }
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-    } catch {
-      case e: Throwable => error("Error while moving some partitions to %s state".format(targetState), e)
-      // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
     }
   }
 
+  def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
+    partitionState.filter { case (_, s) => s == state }.keySet.toSet
+  }
+
   /**
    * This API exercises the partition's state machine. It ensures that every state transition happens from a legal
    * previous state to the target state. Valid state transitions are:
@@ -135,207 +141,331 @@ class PartitionStateMachine(controller: KafkaController, stateChangeLogger: Stat
    *
    * OfflinePartition -> NonExistentPartition
    * --nothing other than marking the partition state as NonExistentPartition
-   * @param topic       The topic of the partition for which the state transition is invoked
-   * @param partition   The partition for which the state transition is invoked
+   * @param partitions  The partitions for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
    */
-  private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
-                                leaderSelector: PartitionLeaderSelector,
-                                callbacks: Callbacks) {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
-    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
-    try {
-      assertValidTransition(topicAndPartition, targetState)
-      targetState match {
-        case NewPartition =>
-          partitionState.put(topicAndPartition, NewPartition)
-          val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
-          stateChangeLog.trace(s"Changed partition $topicAndPartition state from $currState to $targetState with " +
-            s"assigned replicas $assignedReplicas")
-          // post: partition has been assigned replicas
-        case OnlinePartition =>
-          partitionState(topicAndPartition) match {
-            case NewPartition =>
-              // initialize leader and isr path for new partition
-              initializeLeaderAndIsrForPartition(topicAndPartition)
-            case OfflinePartition =>
-              electLeaderForPartition(topic, partition, leaderSelector)
-            case OnlinePartition => // invoked when the leader needs to be re-elected
-              electLeaderForPartition(topic, partition, leaderSelector)
-            case _ => // should never come here since illegal previous states are checked above
+  private def doHandleStateChanges(partitions: Seq[TopicAndPartition], targetState: PartitionState,
+                           partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = {
+    val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
+    partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
+    val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState))
+    invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
+    targetState match {
+      case NewPartition =>
+        validPartitions.foreach { partition =>
+          stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState with " +
+            s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}")
+          partitionState.put(partition, NewPartition)
+        }
+      case OnlinePartition =>
+        val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition)
+        val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
+        if (uninitializedPartitions.nonEmpty) {
+          val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
+          successfulInitializations.foreach { partition =>
+            stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
+              s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
+            partitionState.put(partition, OnlinePartition)
           }
-          partitionState.put(topicAndPartition, OnlinePartition)
-          val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-          stateChangeLog.trace(s"Changed partition $topicAndPartition from $currState to $targetState with leader $leader")
-           // post: partition has a leader
-        case OfflinePartition =>
-          // should be called when the leader for a partition is no longer alive
-          stateChangeLog.trace(s"Changed partition $topicAndPartition state from $currState to $targetState")
-          partitionState.put(topicAndPartition, OfflinePartition)
-          // post: partition has no alive leader
-        case NonExistentPartition =>
-          stateChangeLogger.trace(s"Changed partition $topicAndPartition state from $currState to $targetState")
-          partitionState.put(topicAndPartition, NonExistentPartition)
-          // post: partition state is deleted from all brokers and zookeeper
-      }
-    } catch {
-      case t: Throwable =>
-        stateChangeLog.error(s"Initiated state change for partition $topicAndPartition from $currState to $targetState failed",
-          t)
+        }
+        if (partitionsToElectLeader.nonEmpty) {
+          val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
+          successfulElections.foreach { partition =>
+            stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
+              s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
+            partitionState.put(partition, OnlinePartition)
+          }
+        }
+      case OfflinePartition =>
+        validPartitions.foreach { partition =>
+          stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
+          partitionState.put(partition, OfflinePartition)
+        }
+      case NonExistentPartition =>
+        validPartitions.foreach { partition =>
+          stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState")
+          partitionState.put(partition, NonExistentPartition)
+        }
     }
   }
 
   /**
-   * Invoked on startup of the partition's state machine to set the initial state for all existing partitions in
-   * zookeeper
+   * Initialize leader and isr partition state in zookeeper.
+   * @param partitions The partitions  that we're trying to initialize.
+   * @return The partitions that have been successfully initialized.
    */
-  private def initializePartitionState() {
-    for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
-      // check if leader and isr path exists for partition. If not, then it is in NEW state
-      controllerContext.partitionLeadershipInfo.get(topicPartition) match {
-        case Some(currentLeaderIsrAndEpoch) =>
-          // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
-          if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
-            // leader is alive
-            partitionState.put(topicPartition, OnlinePartition)
-          else
-            partitionState.put(topicPartition, OfflinePartition)
-        case None =>
-          partitionState.put(topicPartition, NewPartition)
+  private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicAndPartition]): Seq[TopicAndPartition] = {
+    val successfulInitializations = mutable.Buffer.empty[TopicAndPartition]
+    val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
+    val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
+        val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+        partition -> liveReplicasForPartition
+    }
+    val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
+
+    partitionsWithoutLiveReplicas.foreach { case (partition, replicas) =>
+      val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " +
+        s"partition $partition from New to Online, assigned replicas are " +
+        s"[${replicas.mkString(",")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " +
+        "replica is alive."
+      logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException(failMsg))
+    }
+    val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
+      val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
+      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
+      partition -> leaderIsrAndControllerEpoch
+    }.toMap
+    val createResponses = try {
+      zkUtils.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
+    } catch {
+      case e: Exception =>
+        partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) }
+        Seq.empty
+    }
+    createResponses.foreach { createResponse =>
+      val code = Code.get(createResponse.rc)
+      val partition = createResponse.ctx.asInstanceOf[TopicAndPartition]
+      val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition)
+      if (code == Code.OK) {
+        controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr,
+          partition.topic,
+          partition.partition,
+          leaderIsrAndControllerEpoch,
+          controllerContext.partitionReplicaAssignment(partition),
+          isNew = true)
+        successfulInitializations += partition
+      } else {
+        logFailedStateChange(partition, NewPartition, OnlinePartition, code)
       }
     }
+    successfulInitializations
   }
 
-  private def assertValidTransition(topicAndPartition: TopicAndPartition, targetState: PartitionState): Unit = {
-    if (!targetState.validPreviousStates.contains(partitionState(topicAndPartition)))
-      throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
-        .format(topicAndPartition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
-        .format(partitionState(topicAndPartition)))
+  /**
+   * Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry.
+   * @param partitions The partitions that we're trying to elect leaders for.
+   * @param partitionLeaderElectionStrategy The election strategy to use.
+   * @return The partitions that successfully had a leader elected.
+   */
+  private def electLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicAndPartition] = {
+    val successfulElections = mutable.Buffer.empty[TopicAndPartition]
+    var remaining = partitions
+    while (remaining.nonEmpty) {
+      val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy)
+      remaining = updatesToRetry
+      successfulElections ++= success
+      failedElections.foreach { case (partition, e) =>
+        logFailedStateChange(partition, partitionState(partition), OnlinePartition, e)
+      }
+    }
+    successfulElections
   }
 
   /**
-   * Invoked on the NewPartition->OnlinePartition state change. When a partition is in the New state, it does not have
-   * a leader and isr path in zookeeper. Once the partition moves to the OnlinePartition state, its leader and isr
-   * path gets initialized and it never goes back to the NewPartition state. From here, it can only go to the
-   * OfflinePartition state.
-   * @param topicAndPartition   The topic/partition whose leader and isr path is to be initialized
+   * Try to elect leaders for multiple partitions.
+   * Electing a leader for a partition updates partition state in zookeeper.
+   *
+   * @param partitions The partitions that we're trying to elect leaders for.
+   * @param partitionLeaderElectionStrategy The election strategy to use.
+   * @return A tuple of three values:
+   *         1. The partitions that successfully had a leader elected.
+   *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
+   *         the partition leader updated partition state while the controller attempted to update partition state.
+   *         3. Exceptions corresponding to failed elections that should not be retried.
    */
-  private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) = {
-    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition).toList
-    val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.isReplicaOnline(r, topicAndPartition))
-    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
-    liveAssignedReplicas.headOption match {
-      case None =>
-        val failMsg = s"Encountered error during state change of partition $topicAndPartition from New to Online, " +
-          s"assigned replicas are [${replicaAssignment.mkString(",")}], live brokers are " +
-          s"[${controllerContext.liveBrokerIds}]. No assigned replica is alive."
-        stateChangeLog.error(failMsg)
-        throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg))
-
-      // leader is the first replica in the list of assigned replicas
-      case Some(leader) =>
-        debug(s"Live assigned replicas for partition $topicAndPartition are: [$liveAssignedReplicas]")
-        val leaderAndIsr = LeaderAndIsr(leader, liveAssignedReplicas)
-        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controller.epoch)
-        debug(s"Initializing leader and isr for partition $topicAndPartition to $leaderIsrAndControllerEpoch")
-
-        try {
-          zkUtils.createPersistentPath(
-            getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-            zkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch)
-          )
-          // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
-          // took over and initialized this partition. This can happen if the current controller went into a long
-          // GC pause
-          controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
-            liveAssignedReplicas,
-            topicAndPartition.topic,
-            topicAndPartition.partition,
-            leaderIsrAndControllerEpoch,
-            replicaAssignment,
-            isNew = true
-          )
-        } catch {
-          case _: ZkNodeExistsException =>
-            // read the controller epoch
-            val leaderIsrAndEpoch = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topicAndPartition.topic,
-              topicAndPartition.partition).get
-
-            val failMsg = s"Encountered error while changing partition $topicAndPartition's state from New to Online " +
-              s"since LeaderAndIsr path already exists with value ${leaderIsrAndEpoch.leaderAndIsr} and controller " +
-              s"epoch ${leaderIsrAndEpoch.controllerEpoch}"
-            stateChangeLog.error(failMsg)
-            throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg))
+  private def doElectLeaderForPartitions(partitions: Seq[TopicAndPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
+  (Seq[TopicAndPartition],
+    Seq[TopicAndPartition],
+    Map[TopicAndPartition, Exception]) = {
+    val getDataResponses = try {
+      zkUtils.getTopicPartitionStatesRaw(partitions)
+    } catch {
+      case e: Exception =>
+        return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
+    }
+    val failedElections = mutable.Map.empty[TopicAndPartition, Exception]
+    val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicAndPartition, LeaderIsrAndControllerEpoch)]
+    getDataResponses.foreach { getDataResponse =>
+      val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition]
+      val currState = partitionState(partition)
+      if (Code.get(getDataResponse.rc) == Code.OK) {
+        val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
+        if (leaderIsrAndControllerEpochOpt.isEmpty) {
+          val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
+          failedElections.put(partition, exception)
         }
+        leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get
+      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+        val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
+        failedElections.put(partition, exception)
+      } else {
+        failedElections.put(partition, KeeperException.create(Code.get(getDataResponse.rc)))
+      }
+    }
+    val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (partition, leaderIsrAndControllerEpoch) =>
+      leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
+    }
+    invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+      val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
+        s"already written by another controller. This probably means that the current controller $controllerId went through " +
+        s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
+      failedElections.put(partition, new StateChangeFailedException(failMsg))
+    }
+    if (validPartitionsForElection.isEmpty) {
+      return (Seq.empty, Seq.empty, failedElections.toMap)
+    }
+    val shuttingDownBrokers  = controllerContext.shuttingDownBrokerIds.toSet
+    val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
+      case OfflinePartitionLeaderElectionStrategy =>
+        leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+      case ReassignPartitionLeaderElectionStrategy =>
+        leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+      case PreferredReplicaPartitionLeaderElectionStrategy =>
+        leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
+      case ControlledShutdownPartitionLeaderElectionStrategy =>
+        leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
     }
+    partitionsWithoutLeaders.foreach { case (partition, leaderAndIsrOpt, recipients) =>
+      val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
+      failedElections.put(partition, new StateChangeFailedException(failMsg))
+    }
+    val recipientsPerPartition = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> recipients }.toMap
+    val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, recipients) => partition -> leaderAndIsrOpt.get }.toMap
+    val (successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch)
+    successfulUpdates.foreach { case (partition, leaderAndIsr) =>
+      val replicas = controllerContext.partitionReplicaAssignment(partition)
+      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
+      controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+      controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition.topic,
+        partition.partition, leaderIsrAndControllerEpoch, replicas, isNew = false)
+    }
+    (successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates)
   }
 
-  /**
-   * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
-   * It invokes the leader election API to elect a leader for the input offline partition
-   * @param topic               The topic of the offline partition
-   * @param partition           The offline partition
-   * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)
-   */
-  def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
-    // handle leader election for the partitions whose leader is no longer alive
-    stateChangeLog.trace(s"Started leader election for partition $topicAndPartition")
-    try {
-      var zookeeperPathUpdateSucceeded: Boolean = false
-      var newLeaderAndIsr: LeaderAndIsr = null
-      var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
-      while(!zookeeperPathUpdateSucceeded) {
-        val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
-        val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
-        val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
-        if (controllerEpoch > controller.epoch) {
-          val failMsg = s"Aborted leader election for partition $topicAndPartition since the LeaderAndIsr path was " +
-            s"already written by another controller. This probably means that the current controller $controllerId went " +
-            s"through a soft failure and another controller was elected with epoch $controllerEpoch."
-          stateChangeLog.error(failMsg)
-          throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg))
+  private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+    val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) =>
+      val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+      liveInSyncReplicas.isEmpty
+    }
+    val (logConfigs, failed) = zkUtils.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals())
+    val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) =>
+      if (failed.contains(partition.topic)) {
+        logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic))
+        (partition, None, false)
+      } else {
+        (partition, Option(leaderIsrAndControllerEpoch), logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue())
+      }
+    } ++ partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => (partition, Option(leaderIsrAndControllerEpoch), false) }
+    partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) =>
+      val assignment = controllerContext.partitionReplicaAssignment(partition)
+      val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+      if (leaderIsrAndControllerEpochOpt.nonEmpty) {
+        val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
+        val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+        val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled)
+        val newLeaderAndIsrOpt = leaderOpt.map { leader =>
+          val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+          else List(leader)
+          leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr)
         }
-        // elect new leader or throw exception
-        val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
-        val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
-          leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
-        newLeaderAndIsr = leaderAndIsr.withZkVersion(newVersion)
-        zookeeperPathUpdateSucceeded = updateSucceeded
-        replicasForThisPartition = replicas
+        (partition, newLeaderAndIsrOpt, liveReplicas)
+      } else {
+        (partition, None, liveReplicas)
       }
-      val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
-      // update the leader cache
-      controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
-      stateChangeLog.trace(s"Elected leader ${newLeaderAndIsr.leader} for Offline partition $topicAndPartition")
-      val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
-      // store new leader and isr info in cache
-      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
-        newLeaderIsrAndControllerEpoch, replicas)
-    } catch {
-      case _: LeaderElectionNotNeededException => // swallow
-      case nroe: NoReplicaOnlineException => throw nroe
-      case sce: Throwable =>
-        val failMsg = s"Encountered error while electing leader for partition $topicAndPartition due to: ${sce.getMessage}"
-        stateChangeLog.error(failMsg)
-        throw new StateChangeFailedException(stateChangeLog.messageWithPrefix(failMsg), sce)
     }
-    debug(s"After leader election, leader cache is updated to ${controllerContext.partitionLeadershipInfo}")
   }
 
-  private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition) match {
-      case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
-      case None => throw new StateChangeFailedException("LeaderAndIsr information doesn't exist for " +
-        s"partition $topicAndPartition in ${partitionState(topicAndPartition)} state")
+  private def leaderForReassign(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val reassignment = controllerContext.partitionsBeingReassigned(partition).newReplicas
+      val liveReplicas = reassignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+      val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment, isr, liveReplicas.toSet)
+      val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+      (partition, newLeaderAndIsrOpt, reassignment)
+    }
+  }
+
+  private def leaderForPreferredReplica(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)]):
+  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val assignment = controllerContext.partitionReplicaAssignment(partition)
+      val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+      val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment, isr, liveReplicas.toSet)
+      val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeader(leader))
+      (partition, newLeaderAndIsrOpt, assignment)
+    }
+  }
+
+  private def leaderForControlledShutdown(leaderIsrAndControllerEpochs: Seq[(TopicAndPartition, LeaderIsrAndControllerEpoch)], shuttingDownBrokers: Set[Int]):
+  Seq[(TopicAndPartition, Option[LeaderAndIsr], Seq[Int])] = {
+    leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
+      val assignment = controllerContext.partitionReplicaAssignment(partition)
+      val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))
+      val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr
+      val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment, isr, liveReplicas.toSet, shuttingDownBrokers)
+      val newIsr = isr.filter(replica => !controllerContext.shuttingDownBrokerIds.contains(replica))
+      val newLeaderAndIsrOpt = leaderOpt.map(leader => leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr))
+      (partition, newLeaderAndIsrOpt, liveReplicas)
+    }
+  }
+
+  private def isValidTransition(partition: TopicAndPartition, targetState: PartitionState) =
+    targetState.validPreviousStates.contains(partitionState(partition))
+
+  private def logInvalidTransition(partition: TopicAndPartition, targetState: PartitionState): Unit = {
+    val currState = partitionState(partition)
+    val e = new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
+      .format(partition, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
+      .format(currState))
+    logFailedStateChange(partition, currState, targetState, e)
+  }
+
+  private def logFailedStateChange(partition: TopicAndPartition, currState: PartitionState, targetState: PartitionState, code: Code): Unit = {
+    logFailedStateChange(partition, currState, targetState, KeeperException.create(code))
+  }
+
+  private def logFailedStateChange(partition: TopicAndPartition, currState: PartitionState, targetState: PartitionState, t: Throwable): Unit = {
+    stateChangeLogger.withControllerEpoch(controllerContext.epoch)
+      .error("Controller %d epoch %d failed to change state for partition %s from %s to %s"
+      .format(controllerId, controllerContext.epoch, partition, currState, targetState), t)
+  }
+}
+
+object PartitionLeaderElectionAlgorithms {
+  def offlinePartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean): Option[Int] = {
+    assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
+      if (uncleanLeaderElectionEnabled) {
+        assignment.find(liveReplicas.contains)
+      } else {
+        None
+      }
     }
   }
+
+  def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
+    reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
+  }
+
+  def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
+    assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
+  }
+
+  def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
+    assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
+  }
 }
 
+sealed trait PartitionLeaderElectionStrategy
+case object OfflinePartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy
+
 sealed trait PartitionState {
   def state: Byte
   def validPreviousStates: Set[PartitionState]


[5/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller

Posted by ju...@apache.org.
KAFKA-5642; Use async ZookeeperClient in Controller

Kafka today uses ZkClient, a wrapper client around the raw Zookeeper client. This library only exposes synchronous apis to the user. Synchronous apis mean we must wait an entire round trip before doing the next operation.

This becomes problematic with partition-heavy clusters, as we find the controller spending a significant amount of time just sending many sequential reads and writes to zookeeper at the per-partition granularity. This especially becomes an issue during:
- controller failover, where the newly elected controller effectively reads all zookeeper state.
- broker failures and controlled shutdown. The controller tries to elect a new leader for partitions previously led by the broker. The controller also removes the broker from isr on partitions for which the broker was a follower. These all incur partition-granular reads and writes to zookeeper.

As a first step in addressing these issues, we built a low-level wrapper client called ZookeeperClient in KAFKA-5501 that encourages pipelined, asynchronous apis.

This patch converts the controller to use the async ZookeeperClient to improve controller failover, broker failure handling, and controlled shutdown times.

Some notable changes made in this patch:
- All ControllerEvents now defer access to zookeeper at processing time instead of enqueue time as was intended with the single-threaded event queue model patch from KAFKA-5028. This results in a fresh view of the zookeeper state by the time we process the event. This reverts the hacks from KAFKA-5502 and KAFKA-5879.
- We refactored PartitionStateMachine and ReplicaStateMachine to process multiple partitions and replicas in batch rather than one-at-a-time so that we can send a batch of requests over to ZookeeperClient to pipeline.
- We've decouple ZookeeperClient handler registration from watcher registration. Previously, these two were coupled, which meant handler registrations actually sent out a request to the zookeeper ensemble to do the actual watcher registration. In KafkaController.onControllerFailover, we register partition modification handlers (thereby registering watchers) and additionally lookup the partition assignments for every topic in the cluster. We can shave a bit of time off failover if we merge these two operations. We can do this by decoupling ZookeeperClient handler registration from watcher registration. This means ZookeeperClient's registration apis have been changed so that they are purely in-memory operations, and they only take effect when the client sends ExistsRequest, GetDataRequest, or GetChildrenRequest.
- We've simplified the logic for updating LeaderAndIsr such that if we get a BADVERSION error code, the controller will now just retry in the next round by reading the new state and trying the update again. This simplifies logic when updating the partition leader epoch, removing replicas from isr, and electing leaders for partitions.
- We've implemented KAFKA-5083: always leave the last surviving member of the ISR in ZK. This means that if people re-disabled unclean leader election, we can still try to elect the leader from the last in-sync replica.
- ZookeeperClient's handlers have been changed so that their methods default to no-ops for convenience.
- All znode paths and definitions for znode encoding and decoding have been consolidated as static methods in ZkData.scala.
- The partition leader election algorithms have been refactored as pure functions so that they can be easily unit tested.
- PartitionStateMachine and ReplicaStateMachine now have unit tests.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #3765 from onurkaraman/KAFKA-5642


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b71ee043
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b71ee043
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b71ee043

Branch: refs/heads/trunk
Commit: b71ee043f8959ee0d4699071ba8fc1e2c5675842
Parents: 68f324f
Author: Onur Karaman <ok...@linkedin.com>
Authored: Wed Oct 18 09:14:59 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Oct 18 09:14:59 2017 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/api/LeaderAndIsr.scala |    2 +-
 .../controller/ControllerChannelManager.scala   |    6 +-
 .../controller/ControllerEventManager.scala     |   17 +-
 .../kafka/controller/KafkaController.scala      | 1406 +++++++-----------
 .../controller/KafkaControllerZkUtils.scala     |  684 +++++++++
 .../controller/PartitionLeaderSelector.scala    |  205 ---
 .../controller/PartitionStateMachine.scala      |  582 +++++---
 .../kafka/controller/ReplicaStateMachine.scala  |  433 +++---
 .../kafka/controller/TopicDeletionManager.scala |   43 +-
 .../main/scala/kafka/controller/ZkData.scala    |  248 +++
 .../kafka/controller/ZookeeperClient.scala      |  127 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   22 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   20 +-
 .../controller/ControllerIntegrationTest.scala  |    2 +-
 .../PartitionLeaderElectionAlgorithmsTest.scala |  176 +++
 .../controller/PartitionStateMachineTest.scala  |  311 ++++
 .../controller/ReplicaStateMachineTest.scala    |  371 +++++
 .../kafka/controller/ZookeeperClientTest.scala  |   42 +-
 .../ControlledShutdownLeaderSelectorTest.scala  |   73 -
 .../unit/kafka/server/LeaderElectionTest.scala  |    2 +-
 20 files changed, 3163 insertions(+), 1609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/api/LeaderAndIsr.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsr.scala b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
index 029d476..7a83cf3 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsr.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsr.scala
@@ -38,7 +38,7 @@ case class LeaderAndIsr(leader: Int,
 
   def newLeader(leader: Int) = newLeaderAndIsr(leader, isr)
 
-  def newLeaderAndIsr(leader: Int, isr: List[Int]) = LeaderAndIsr(leader, leaderEpoch + 1, isr, zkVersion + 1)
+  def newLeaderAndIsr(leader: Int, isr: List[Int]) = LeaderAndIsr(leader, leaderEpoch + 1, isr, zkVersion)
 
   def newEpochAndZkVersion = newLeaderAndIsr(leader, isr)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 5ac85cc..9fef617 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -314,7 +314,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int], isNew: Boolean = false) {
+                                       replicas: Seq[Int], isNew: Boolean) {
     val topicPartition = new TopicPartition(topic, partition)
 
     brokerIds.filter(_ >= 0).foreach { brokerId =>
@@ -334,7 +334,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
-                                      callback: (AbstractResponse, Int) => Unit = null) {
+                                      callback: (AbstractResponse, Int) => Unit) {
     brokerIds.filter(b => b >= 0).foreach { brokerId =>
       stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
       val v = stopReplicaRequestMap(brokerId)
@@ -349,7 +349,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
 
   /** Send UpdateMetadataRequest to the given brokers for the given partitions and partitions that are being deleted */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
-                                         partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+                                         partitions: collection.Set[TopicAndPartition]) {
 
     def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ControllerEventManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index f7ed54e..396a39d 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -18,12 +18,14 @@
 package kafka.controller
 
 import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection._
+import java.util.concurrent.locks.ReentrantLock
 
 import kafka.metrics.KafkaTimer
+import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
 
+import scala.collection._
+
 object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
@@ -31,7 +33,7 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
                              eventProcessedListener: ControllerEvent => Unit) {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
-
+  private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[ControllerEvent]
   private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
 
@@ -41,7 +43,14 @@ class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer
 
   def close(): Unit = thread.shutdown()
 
-  def put(event: ControllerEvent): Unit = queue.put(event)
+  def put(event: ControllerEvent): Unit = inLock(putLock) {
+    queue.put(event)
+  }
+
+  def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
+    queue.clear()
+    queue.put(event)
+  }
 
   class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
     override def doWork(): Unit = {


[2/5] kafka git commit: KAFKA-5642; Use async ZookeeperClient in Controller

Posted by ju...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
old mode 100755
new mode 100644
index f140f02..f811612
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -16,11 +16,15 @@
 */
 package kafka.controller
 
+import kafka.api.LeaderAndIsr
 import kafka.common.{StateChangeFailedException, TopicAndPartition}
 import kafka.controller.Callbacks.CallbackBuilder
-import kafka.utils.{Logging, ReplicationUtils}
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.zookeeper.KeeperException
+import org.apache.zookeeper.KeeperException.Code
 
-import scala.collection._
+import scala.collection.mutable
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
@@ -40,27 +44,26 @@ import scala.collection._
  * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
  *                        ReplicaDeletionSuccessful
  */
-class ReplicaStateMachine(controller: KafkaController, stateChangeLogger: StateChangeLogger) extends Logging {
-
-  private val controllerContext = controller.controllerContext
-  private val controllerId = controller.config.brokerId
-  private val zkUtils = controllerContext.zkUtils
-  private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller, stateChangeLogger)
+class ReplicaStateMachine(config: KafkaConfig,
+                          stateChangeLogger: StateChangeLogger,
+                          controllerContext: ControllerContext,
+                          topicDeletionManager: TopicDeletionManager,
+                          zkUtils: KafkaControllerZkUtils,
+                          replicaState: mutable.Map[PartitionAndReplica, ReplicaState],
+                          controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends Logging {
+  private val controllerId = config.brokerId
 
   this.logIdent = s"[ReplicaStateMachine controllerId=$controllerId] "
 
-
   /**
-   * Invoked on successful controller election. First registers a broker change listener since that triggers all
-   * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
-   * Then triggers the OnlineReplica state change for all replicas.
+   * Invoked on successful controller election.
    */
   def startup() {
+    info("Initializing replica state")
     initializeReplicaState()
-    handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
-
-    info("Started replica state machine with initial state -> " + replicaState.toString())
+    info("Triggering online replica state changes")
+    handleStateChanges(controllerContext.allLiveReplicas().toSeq, OnlineReplica)
+    info(s"Started replica state machine with initial state -> $replicaState")
   }
 
   /**
@@ -68,24 +71,38 @@ class ReplicaStateMachine(controller: KafkaController, stateChangeLogger: StateC
    */
   def shutdown() {
     replicaState.clear()
-
     info("Stopped replica state machine")
   }
 
   /**
-   * This API is invoked by the broker change controller callbacks and the startup API of the state machine
-   * @param replicas     The list of replicas (brokers) that need to be transitioned to the target state
-   * @param targetState  The state that the replicas should be moved to
-   * The controller's allLeaders cache should have been updated before this
+   * Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
+   * in zookeeper
    */
-  def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
-                         callbacks: Callbacks = (new CallbackBuilder).build) {
+  private def initializeReplicaState() {
+    controllerContext.partitionReplicaAssignment.foreach { case (partition, replicas) =>
+      replicas.foreach { replicaId =>
+        val partitionAndReplica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+        if (controllerContext.isReplicaOnline(replicaId, partition))
+          replicaState.put(partitionAndReplica, OnlineReplica)
+        else
+        // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
+        // This is required during controller failover since during controller failover a broker can go down,
+        // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
+          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
+      }
+    }
+  }
+
+  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState,
+                         callbacks: Callbacks = (new CallbackBuilder).build): Unit = {
     if (replicas.nonEmpty) {
-      info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
       try {
-        brokerRequestBatch.newBatch()
-        replicas.foreach(r => handleStateChange(r, targetState, callbacks))
-        brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
+        controllerBrokerRequestBatch.newBatch()
+        replicas.groupBy(_.replica).map { case (replicaId, replicas) =>
+          val partitions = replicas.map(_.topicAndPartition)
+          doHandleStateChanges(replicaId, partitions, targetState, callbacks)
+        }
+        controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
       } catch {
         case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
       }
@@ -122,178 +139,260 @@ class ReplicaStateMachine(controller: KafkaController, stateChangeLogger: StateC
    *
    * ReplicaDeletionSuccessful -> NonExistentReplica
    * -- remove the replica from the in memory partition replica assignment cache
-
-
-   * @param partitionAndReplica The replica for which the state transition is invoked
+   *
+   * @param replicaId The replica for which the state transition is invoked
+   * @param partitions The partitions on this replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
-  def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
-                        callbacks: Callbacks) {
-    val topic = partitionAndReplica.topic
-    val partition = partitionAndReplica.partition
-    val replicaId = partitionAndReplica.replica
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
-    val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)
-    try {
-
-      def logStateChange(): Unit =
-        stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +
-          s"$currState to $targetState")
-
-      val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
-      assertValidTransition(partitionAndReplica, targetState)
-      targetState match {
-        case NewReplica =>
-          // start replica as a follower to the current leader for its partition
-          val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
-          leaderIsrAndControllerEpochOpt match {
+  private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicAndPartition], targetState: ReplicaState,
+                                   callbacks: Callbacks): Unit = {
+    val replicas = partitions.map(partition => PartitionAndReplica(partition.topic, partition.partition, replicaId))
+    replicas.foreach(replica => replicaState.getOrElseUpdate(replica, NonExistentReplica))
+    val (validReplicas, invalidReplicas) = replicas.partition(replica => isValidTransition(replica, targetState))
+    invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
+    targetState match {
+      case NewReplica =>
+        validReplicas.foreach { replica =>
+          val partition = replica.topicAndPartition
+          controllerContext.partitionLeadershipInfo.get(partition) match {
             case Some(leaderIsrAndControllerEpoch) =>
-              if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
-                throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +
-                  s"be moved to NewReplica state as it is being requested to become leader")
-              brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                  topic, partition, leaderIsrAndControllerEpoch,
-                                                                  replicaAssignment, isNew = true)
-            case None => // new leader request will be sent to this replica when one gets elected
+              if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
+                val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
+                logFailedStateChange(replica, replicaState(replica), OfflineReplica, exception)
+              } else {
+                controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
+                  replica.topic,
+                  replica.partition,
+                  leaderIsrAndControllerEpoch,
+                  controllerContext.partitionReplicaAssignment(replica.topicAndPartition),
+                  isNew = true)
+                logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)
+                replicaState.put(replica, NewReplica)
+              }
+            case None =>
+              logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)
+              replicaState.put(replica, NewReplica)
           }
-          replicaState.put(partitionAndReplica, NewReplica)
-          logStateChange()
-        case ReplicaDeletionStarted =>
-          replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
-          // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
-            callbacks.stopReplicaResponseCallback)
-          logStateChange()
-        case ReplicaDeletionIneligible =>
-          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
-          logStateChange()
-        case ReplicaDeletionSuccessful =>
-          replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
-          logStateChange()
-        case NonExistentReplica =>
-          // remove this replica from the assigned replicas list for its partition
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
-          replicaState.remove(partitionAndReplica)
-          logStateChange()
-        case OnlineReplica =>
-          replicaState(partitionAndReplica) match {
+        }
+      case OnlineReplica =>
+        validReplicas.foreach { replica =>
+          val partition = replica.topicAndPartition
+          replicaState(replica) match {
             case NewReplica =>
-              // add this replica to the assigned replicas list for its partition
-              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-              if(!currentAssignedReplicas.contains(replicaId))
-                controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              logStateChange()
+              val assignment = controllerContext.partitionReplicaAssignment(partition)
+              if (!assignment.contains(replicaId)) {
+                controllerContext.partitionReplicaAssignment.put(partition, assignment :+ replicaId)
+              }
             case _ =>
-              // check if the leader for this partition ever existed
-              controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
+              controllerContext.partitionLeadershipInfo.get(partition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
-                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
-                    replicaAssignment)
-                  replicaState.put(partitionAndReplica, OnlineReplica)
-                  logStateChange()
-                case None => // that means the partition was never in OnlinePartition state, this means the broker never
-                             // started a log for that partition and does not have a high watermark value for this partition
+                  controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
+                    replica.topic,
+                    replica.partition,
+                    leaderIsrAndControllerEpoch,
+                    controllerContext.partitionReplicaAssignment(partition), isNew = false)
+                case None =>
               }
           }
-          replicaState.put(partitionAndReplica, OnlineReplica)
-        case OfflineReplica =>
-          // send stop replica command to the replica so that it stops fetching from the leader
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
-          // As an optimization, the controller removes dead replicas from the ISR
-          val leaderAndIsrIsEmpty: Boolean =
-            controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
-              case Some(_) =>
-                controller.removeReplicaFromIsr(topic, partition, replicaId) match {
-                  case Some(updatedLeaderIsrAndControllerEpoch) =>
-                    // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
-                    val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-                    if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {
-                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
-                        topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
-                    }
-                    replicaState.put(partitionAndReplica, OfflineReplica)
-                    logStateChange()
-                    false
-                  case None =>
-                    true
-                }
-              case None =>
-                true
-            }
-          if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))
-            throw new StateChangeFailedException(
-              s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +
-                s"and isr path in zookeeper is empty")
+          logSuccessfulTransition(replicaId, partition, replicaState(replica), OnlineReplica)
+          replicaState.put(replica, OnlineReplica)
+        }
+      case OfflineReplica =>
+        validReplicas.foreach { replica =>
+          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topic, replica.partition, deletePartition = false, null)
+        }
+        val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicAndPartition))
+        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicAndPartition))
+        updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
+          if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
+            val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
+            controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
+              partition.topic,
+              partition.partition,
+              leaderIsrAndControllerEpoch,
+              controllerContext.partitionReplicaAssignment(partition), isNew = false)
+          }
+          val replica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+          logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)
+          replicaState.put(replica, OfflineReplica)
+        }
+      case ReplicaDeletionStarted =>
+        validReplicas.foreach { replica =>
+          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionStarted)
+          replicaState.put(replica, ReplicaDeletionStarted)
+          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId),
+            replica.topic,
+            replica.partition,
+            deletePartition = true,
+            callbacks.stopReplicaResponseCallback)
+        }
+      case ReplicaDeletionIneligible =>
+        validReplicas.foreach { replica =>
+          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionIneligible)
+          replicaState.put(replica, ReplicaDeletionIneligible)
+        }
+      case ReplicaDeletionSuccessful =>
+        validReplicas.foreach { replica =>
+          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionSuccessful)
+          replicaState.put(replica, ReplicaDeletionSuccessful)
+        }
+      case NonExistentReplica =>
+        validReplicas.foreach { replica =>
+          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicAndPartition)
+          controllerContext.partitionReplicaAssignment.put(replica.topicAndPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
+          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), NonExistentReplica)
+          replicaState.remove(replica)
+        }
+    }
+  }
 
+  /**
+   * Repeatedly attempt to remove a replica from the isr of multiple partitions until there are no more remaining partitions
+   * to retry.
+   * @param replicaId The replica being removed from isr of multiple partitions
+   * @param partitions The partitions from which we're trying to remove the replica from isr
+   * @return The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
+   */
+  private def removeReplicasFromIsr(replicaId: Int, partitions: Seq[TopicAndPartition]):
+  Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+    var results = Map.empty[TopicAndPartition, LeaderIsrAndControllerEpoch]
+    var remaining = partitions
+    while (remaining.nonEmpty) {
+      val (successfulRemovals, removalsToRetry, failedRemovals) = doRemoveReplicasFromIsr(replicaId, remaining)
+      results ++= successfulRemovals
+      remaining = removalsToRetry
+      failedRemovals.foreach { case (partition, e) =>
+        val replica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+        logFailedStateChange(replica, replicaState(replica), OfflineReplica, e)
       }
     }
-    catch {
-      case t: Throwable =>
-        stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +
-          s"$currState to $targetState failed", t)
+    results
+  }
+
+  /**
+   * Try to remove a replica from the isr of multiple partitions.
+   * Removing a replica from isr updates partition state in zookeeper.
+   *
+   * @param replicaId The replica being removed from isr of multiple partitions
+   * @param partitions The partitions from which we're trying to remove the replica from isr
+   * @return A tuple of three values:
+   *         1. The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
+   *         2. The partitions that we should retry due to a zookeeper BADVERSION conflict. Version conflicts can occur if
+   *         the partition leader updated partition state while the controller attempted to update partition state.
+   *         3. Exceptions corresponding to failed removals that should not be retried.
+   */
+  private def doRemoveReplicasFromIsr(replicaId: Int, partitions: Seq[TopicAndPartition]):
+  (Map[TopicAndPartition, LeaderIsrAndControllerEpoch],
+    Seq[TopicAndPartition],
+    Map[TopicAndPartition, Exception]) = {
+    val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
+    val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
+    val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
+      val newLeader = if (replicaId == leaderAndIsr.leader) LeaderAndIsr.NoLeader else leaderAndIsr.leader
+      val adjustedIsr = if (leaderAndIsr.isr.size == 1) leaderAndIsr.isr else leaderAndIsr.isr.filter(_ != replicaId)
+      leaderAndIsr.newLeaderAndIsr(newLeader, adjustedIsr)
+    }
+    val (successfulUpdates, updatesToRetry, failedUpdates) = zkUtils.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch)
+    val exceptionsForPartitionsWithNoLeaderAndIsrInZk = partitionsWithNoLeaderAndIsrInZk.flatMap { partition =>
+      if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
+        val exception = new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $partition since the leader and isr path in zookeeper is empty")
+        Option(partition -> exception)
+      } else None
+    }.toMap
+    val leaderIsrAndControllerEpochs = (leaderAndIsrsWithoutReplica ++ successfulUpdates).map { case (partition, leaderAndIsr) =>
+      val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
+      controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+      partition -> leaderIsrAndControllerEpoch
     }
+    (leaderIsrAndControllerEpochs, updatesToRetry, failedStateReads ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk ++ failedUpdates)
   }
 
-  def areAllReplicasForTopicDeleted(topic: String): Boolean = {
-    val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
-    val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
-    debug(s"Are all replicas for topic $topic deleted $replicaStatesForTopic")
-    replicaStatesForTopic.forall(_._2 == ReplicaDeletionSuccessful)
+  /**
+   * Gets the partition state from zookeeper
+   * @param partitions the partitions whose state we want from zookeeper
+   * @return A tuple of three values:
+   *         1. The LeaderAndIsrs of partitions whose state we successfully read from zookeeper
+   *         2. The partitions that had no leader and isr state in zookeeper. This happens if the controller
+   *         didn't finish partition initialization.
+   *         3. Exceptions corresponding to failed zookeeper lookups or states whose controller epoch exceeds our current epoch.
+   */
+  private def getTopicPartitionStatesFromZk(partitions: Seq[TopicAndPartition]):
+  (Map[TopicAndPartition, LeaderAndIsr],
+    Seq[TopicAndPartition],
+    Map[TopicAndPartition, Exception]) = {
+    val leaderAndIsrs = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
+    val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition]
+    val failed = mutable.Map.empty[TopicAndPartition, Exception]
+    val getDataResponses = try {
+      zkUtils.getTopicPartitionStatesRaw(partitions)
+    } catch {
+      case e: Exception =>
+        partitions.foreach(partition => failed.put(partition, e))
+        return (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
+    }
+    getDataResponses.foreach { getDataResponse =>
+      val partition = getDataResponse.ctx.asInstanceOf[TopicAndPartition]
+      if (Code.get(getDataResponse.rc) == Code.OK) {
+        val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
+        if (leaderIsrAndControllerEpochOpt.isEmpty) {
+          partitionsWithNoLeaderAndIsrInZk += partition
+        } else {
+          val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get
+          if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
+            val exception = new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+              s"means the current controller with epoch ${controllerContext.epoch} went through a soft failure and another " +
+              s"controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}. Aborting state change by this controller")
+            failed.put(partition, exception)
+          } else {
+            leaderAndIsrs.put(partition, leaderIsrAndControllerEpoch.leaderAndIsr)
+          }
+        }
+      } else if (Code.get(getDataResponse.rc) == Code.NONODE) {
+        partitionsWithNoLeaderAndIsrInZk += partition
+      } else {
+        failed.put(partition, KeeperException.create(Code.get(getDataResponse.rc)))
+      }
+    }
+    (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
   }
 
   def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
-    val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
-    val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
-    replicaStatesForTopic.foldLeft(false)((deletionState, r) => deletionState || r._2 == ReplicaDeletionStarted)
+    controllerContext.replicasForTopic(topic).exists(replica => replicaState(replica) == ReplicaDeletionStarted)
   }
 
   def replicasInState(topic: String, state: ReplicaState): Set[PartitionAndReplica] = {
-    replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet
+    replicaState.filter { case (replica, s) => replica.topic.equals(topic) && s == state }.keySet.toSet
   }
 
-  def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
-    replicaState.exists(r => r._1.topic.equals(topic) && r._2 == state)
+  def areAllReplicasForTopicDeleted(topic: String): Boolean = {
+    controllerContext.replicasForTopic(topic).forall(replica => replicaState(replica) == ReplicaDeletionSuccessful)
   }
 
-  def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
-    val deletionStates = Set[ReplicaState](ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
-    replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
+  def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
+    replicaState.exists { case (replica, s) => replica.topic.equals(topic) && s == state}
   }
 
-  private def assertValidTransition(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState): Unit = {
-    assert(targetState.validPreviousStates.contains(replicaState(partitionAndReplica)),
-      "Replica %s should be in the %s states before moving to %s state"
-        .format(partitionAndReplica, targetState.validPreviousStates.mkString(","), targetState) +
-        ". Instead it is in %s state".format(replicaState(partitionAndReplica)))
-  }
+  private def isValidTransition(replica: PartitionAndReplica, targetState: ReplicaState) =
+    targetState.validPreviousStates.contains(replicaState(replica))
 
-  /**
-   * Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions
-   * in zookeeper
-   */
-  private def initializeReplicaState() {
-    for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
-      val topic = topicPartition.topic
-      val partition = topicPartition.partition
-      assignedReplicas.foreach { replicaId =>
-        val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
-        if (controllerContext.isReplicaOnline(replicaId, topicPartition))
-          replicaState.put(partitionAndReplica, OnlineReplica)
-        else
-          // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
-          // This is required during controller failover since during controller failover a broker can go down,
-          // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
-          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
-      }
-    }
+  private def logSuccessfulTransition(replicaId: Int, partition: TopicAndPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
+    stateChangeLogger.withControllerEpoch(controllerContext.epoch)
+      .trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
   }
 
-  def partitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
-    controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
+  private def logInvalidTransition(replica: PartitionAndReplica, targetState: ReplicaState): Unit = {
+    val currState = replicaState(replica)
+    val e = new IllegalStateException("Replica %s should be in the %s states before moving to %s state"
+      .format(replica, targetState.validPreviousStates.mkString(","), targetState) + ". Instead it is in %s state"
+      .format(currState))
+    logFailedStateChange(replica, currState, targetState, e)
   }
 
+  private def logFailedStateChange(replica: PartitionAndReplica, currState: ReplicaState, targetState: ReplicaState, t: Throwable): Unit = {
+    stateChangeLogger.withControllerEpoch(controllerContext.epoch)
+      .error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
+      .format(controllerId, controllerContext.epoch, replica.replica, replica.topic, replica.partition, currState, targetState), t)
+  }
 }
 
 sealed trait ReplicaState {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 325488e..52302a3 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -18,9 +18,7 @@ package kafka.controller
 
 
 import kafka.common.TopicAndPartition
-import kafka.server.ConfigType
 import kafka.utils.Logging
-import kafka.utils.ZkUtils._
 
 import scala.collection.{Set, mutable}
 
@@ -57,11 +55,11 @@ import scala.collection.{Set, mutable}
  *    it marks the topic for deletion retry.
  * @param controller
  */
-class TopicDeletionManager(controller: KafkaController, eventManager: ControllerEventManager) extends Logging {
+class TopicDeletionManager(controller: KafkaController,
+                           eventManager: ControllerEventManager,
+                           kafkaControllerZkUtils: KafkaControllerZkUtils) extends Logging {
   this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
   val controllerContext = controller.controllerContext
-  val partitionStateMachine = controller.partitionStateMachine
-  val replicaStateMachine = controller.replicaStateMachine
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
   val topicsToBeDeleted = mutable.Set.empty[String]
   val partitionsToBeDeleted = mutable.Set.empty[TopicAndPartition]
@@ -74,12 +72,8 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller
       topicsIneligibleForDeletion ++= initialTopicsIneligibleForDeletion & topicsToBeDeleted
     } else {
       // if delete topic is disabled clean the topic entries under /admin/delete_topics
-      val zkUtils = controllerContext.zkUtils
-      for (topic <- initialTopicsToBeDeleted) {
-        val deleteTopicPath = getDeleteTopicPath(topic)
-        info("Removing " + deleteTopicPath + " since delete topic is disabled")
-        zkUtils.deletePath(deleteTopicPath)
-      }
+      info("Removing " + initialTopicsToBeDeleted + " since delete topic is disabled")
+      kafkaControllerZkUtils.deleteTopicDeletions(initialTopicsToBeDeleted.toSeq)
     }
   }
 
@@ -144,7 +138,7 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller
         val topics = replicasThatFailedToDelete.map(_.topic)
         debug("Deletion failed for replicas %s. Halting deletion for topics %s"
           .format(replicasThatFailedToDelete.mkString(","), topics))
-        controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
+        controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete.toSeq, ReplicaDeletionIneligible)
         markTopicIneligibleForDeletion(topics)
         resumeDeletions()
       }
@@ -203,7 +197,7 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller
   def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
     val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
     debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))
-    controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)
+    controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas.toSeq, ReplicaDeletionSuccessful)
     resumeDeletions()
   }
 
@@ -229,26 +223,25 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller
     val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
     info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
       .format(topic, failedReplicas.mkString(",")))
-    controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
+    controller.replicaStateMachine.handleStateChanges(failedReplicas.toSeq, OfflineReplica)
   }
 
   private def completeDeleteTopic(topic: String) {
     // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
     // firing before the new topic listener when a deleted topic gets auto created
-    controller.deregisterPartitionModificationsListener(topic)
+    controller.unregisterPartitionModificationsHandlers(Seq(topic))
     val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
     // controller will remove this replica from the state machine as well as its partition assignment cache
-    replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
+    controller.replicaStateMachine.handleStateChanges(replicasForDeletedTopic.toSeq, NonExistentReplica)
     val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
     // move respective partition to OfflinePartition and NonExistentPartition state
-    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
-    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
+    controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, OfflinePartition)
+    controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq, NonExistentPartition)
     topicsToBeDeleted -= topic
     partitionsToBeDeleted.retain(_.topic != topic)
-    val zkUtils = controllerContext.zkUtils
-    zkUtils.deletePathRecursive(getTopicPath(topic))
-    zkUtils.deletePathRecursive(getEntityConfigPath(ConfigType.Topic, topic))
-    zkUtils.deletePath(getDeleteTopicPath(topic))
+    kafkaControllerZkUtils.deleteTopicZNode(topic)
+    kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
+    kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
     controllerContext.removeTopic(topic)
   }
 
@@ -293,11 +286,11 @@ class TopicDeletionManager(controller: KafkaController, eventManager: Controller
       val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
       val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
       // move dead replicas directly to failed state
-      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
+      controller.replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, ReplicaDeletionIneligible)
       // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
-      replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
+      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
-      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
+      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted,
         new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
           eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))).build)
       if (deadReplicasForTopic.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZkData.scala b/core/src/main/scala/kafka/controller/ZkData.scala
new file mode 100644
index 0000000..2240b6a
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/ZkData.scala
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.controller
+
+import java.util.Properties
+
+import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
+import kafka.cluster.{Broker, EndPoint}
+import kafka.common.TopicAndPartition
+import kafka.utils.Json
+import org.apache.zookeeper.data.Stat
+
+import scala.collection.Seq
+
+object ControllerZNode {
+  def path = "/controller"
+  def encode(brokerId: Int, timestamp: Long): Array[Byte] =
+    Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString)).getBytes("UTF-8")
+  def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
+    js.asJsonObject("brokerid").to[Int]
+  }
+}
+
+object ControllerEpochZNode {
+  def path = "/controller_epoch"
+  def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes("UTF-8")
+  def decode(bytes: Array[Byte]) : Int = new String(bytes, "UTF-8").toInt
+}
+
+object ConfigZNode {
+  def path = "/config"
+  def encode: Array[Byte] = null
+}
+
+object BrokersZNode {
+  def path = "/brokers"
+  def encode: Array[Byte] = null
+}
+
+object BrokerIdsZNode {
+  def path = s"${BrokersZNode.path}/ids"
+  def encode: Array[Byte] = null
+}
+
+object BrokerIdZNode {
+  def path(id: Int) = s"${BrokerIdsZNode.path}/$id"
+  def encode(id: Int,
+             host: String,
+             port: Int,
+             advertisedEndpoints: Seq[EndPoint],
+             jmxPort: Int,
+             rack: Option[String],
+             apiVersion: ApiVersion): Array[Byte] = {
+    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack).getBytes("UTF-8")
+  }
+
+  def decode(id: Int, bytes: Array[Byte]): Broker = {
+    Broker.createBroker(id, new String(bytes, "UTF-8"))
+  }
+}
+
+object TopicsZNode {
+  def path = s"${BrokersZNode.path}/topics"
+  def encode: Array[Byte] = null
+}
+
+object TopicZNode {
+  def path(topic: String) = s"${TopicsZNode.path}/$topic"
+  def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+    val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
+    Json.encode(Map("version" -> 1, "partitions" -> assignmentJson)).getBytes("UTF-8")
+  }
+  def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
+    Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
+      val assignmentJson = js.asJsonObject
+      val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
+      partitionsJsonOpt.map { partitionsJson =>
+        partitionsJson.iterator.map { case (partition, replicas) =>
+          TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
+        }
+      }
+    }.map(_.toMap).getOrElse(Map.empty)
+  }
+}
+
+object TopicPartitionsZNode {
+  def path(topic: String) = s"${TopicZNode.path(topic)}/partitions"
+  def encode: Array[Byte] = null
+}
+
+object TopicPartitionZNode {
+  def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
+  def encode: Array[Byte] = null
+}
+
+object TopicPartitionStateZNode {
+  def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
+  def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
+    val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
+    val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+    Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
+      "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)).getBytes("UTF-8")
+  }
+  def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
+    Json.parseFull(new String(bytes, "UTF-8")).map { js =>
+      val leaderIsrAndEpochInfo = js.asJsonObject
+      val leader = leaderIsrAndEpochInfo("leader").to[Int]
+      val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int]
+      val isr = leaderIsrAndEpochInfo("isr").to[List[Int]]
+      val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int]
+      val zkPathVersion = stat.getVersion
+      LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch)
+    }
+  }
+}
+
+object ConfigEntityTypeZNode {
+  def path(entityType: String) = s"${ConfigZNode.path}/$entityType"
+  def encode: Array[Byte] = null
+}
+
+object ConfigEntityZNode {
+  def path(entityType: String, entityName: String) = s"${ConfigEntityTypeZNode.path(entityType)}/$entityName"
+  def encode(config: Properties): Array[Byte] = {
+    import scala.collection.JavaConverters._
+    Json.encode(Map("version" -> 1, "config" -> config.asScala)).getBytes("UTF-8")
+  }
+  def decode(bytes: Array[Byte]): Option[Properties] = {
+    Json.parseFull(new String(bytes, "UTF-8")).map { js =>
+      val configOpt = js.asJsonObjectOption.flatMap(_.get("config").flatMap(_.asJsonObjectOption))
+      val props = new Properties()
+      configOpt.foreach(config => config.iterator.foreach { case (k, v) => props.setProperty(k, v.to[String]) })
+      props
+    }
+  }
+}
+
+object IsrChangeNotificationZNode {
+  def path = "/isr_change_notification"
+  def encode: Array[Byte] = null
+}
+
+object IsrChangeNotificationSequenceZNode {
+  val SequenceNumberPrefix = "isr_change_"
+  def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+  def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+    val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
+    Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson)).getBytes("UTF-8")
+  }
+
+  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
+    Json.parseFull(new String(bytes, "UTF-8")).map { js =>
+      val partitionsJson = js.asJsonObject("partitions").asJsonArray
+      partitionsJson.iterator.map { partitionsJson =>
+        val partitionJson = partitionsJson.asJsonObject
+        val topic = partitionJson("topic").to[String]
+        val partition = partitionJson("partition").to[Int]
+        TopicAndPartition(topic, partition)
+      }
+    }
+  }.map(_.toSet).getOrElse(Set.empty)
+  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+}
+
+object LogDirEventNotificationZNode {
+  def path = "/log_dir_event_notification"
+  def encode: Array[Byte] = null
+}
+
+object LogDirEventNotificationSequenceZNode {
+  val SequenceNumberPrefix = "log_dir_event_"
+  val LogDirFailureEvent = 1
+  def path(sequenceNumber: String) = s"${LogDirEventNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
+  def encode(brokerId: Int) =
+    Json.encode(Map("version" -> 1, "broker" -> brokerId, "event" -> LogDirFailureEvent)).getBytes("UTF-8")
+  def decode(bytes: Array[Byte]): Option[Int] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
+    js.asJsonObject("broker").to[Int]
+  }
+  def sequenceNumber(path: String) = path.substring(path.lastIndexOf(SequenceNumberPrefix) + SequenceNumberPrefix.length)
+}
+
+object AdminZNode {
+  def path = "/admin"
+  def encode: Array[Byte] = null
+}
+
+object DeleteTopicsZNode {
+  def path = s"${AdminZNode.path}/delete_topics"
+  def encode: Array[Byte] = null
+}
+
+object DeleteTopicsTopicZNode {
+  def path(topic: String) = s"${DeleteTopicsZNode.path}/$topic"
+  def encode: Array[Byte] = null
+}
+
+object ReassignPartitionsZNode {
+  def path = s"${AdminZNode.path}/reassign_partitions"
+  def encode(reassignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+    val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
+      Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
+    }
+    Json.encode(Map("version" -> 1, "partitions" -> reassignmentJson)).getBytes("UTF-8")
+  }
+  def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseFull(new String(bytes, "UTF-8")).flatMap { js =>
+    val reassignmentJson = js.asJsonObject
+    val partitionsJsonOpt = reassignmentJson.get("partitions")
+    partitionsJsonOpt.map { partitionsJson =>
+      partitionsJson.asJsonArray.iterator.map { partitionFieldsJs =>
+        val partitionFields = partitionFieldsJs.asJsonObject
+        val topic = partitionFields("topic").to[String]
+        val partition = partitionFields("partition").to[Int]
+        val replicas = partitionFields("replicas").to[Seq[Int]]
+        TopicAndPartition(topic, partition) -> replicas
+      }
+    }
+  }.map(_.toMap).getOrElse(Map.empty)
+}
+
+object PreferredReplicaElectionZNode {
+  def path = s"${AdminZNode.path}/preferred_replica_election"
+  def encode(partitions: Set[TopicAndPartition]): Array[Byte] =
+    Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))).getBytes("UTF-8")
+  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseFull(new String(bytes, "UTF-8")).map { js =>
+    val partitionsJson = js.asJsonObject("partitions").asJsonArray
+    partitionsJson.iterator.map { partitionsJson =>
+      val partitionJson = partitionsJson.asJsonObject
+      val topic = partitionJson("topic").to[String]
+      val partition = partitionJson("partition").to[Int]
+      TopicAndPartition(topic, partition)
+    }
+  }.map(_.toSet).getOrElse(Set.empty)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/controller/ZookeeperClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ZookeeperClient.scala b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
index 7ffa511..e68c738 100644
--- a/core/src/main/scala/kafka/controller/ZookeeperClient.scala
+++ b/core/src/main/scala/kafka/controller/ZookeeperClient.scala
@@ -29,13 +29,13 @@ import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper}
 
 /**
-  * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
-  *
-  * @param connectString comma separated host:port pairs, each corresponding to a zk server
-  * @param sessionTimeoutMs session timeout in milliseconds
-  * @param connectionTimeoutMs connection timeout in milliseconds
-  * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
-  */
+ * ZookeeperClient is a zookeeper client that encourages pipelined requests to zookeeper.
+ *
+ * @param connectString comma separated host:port pairs, each corresponding to a zk server
+ * @param sessionTimeoutMs session timeout in milliseconds
+ * @param connectionTimeoutMs connection timeout in milliseconds
+ * @param stateChangeHandler state change handler callbacks called by the underlying zookeeper client's EventThread.
+ */
 class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTimeoutMs: Int, stateChangeHandler: StateChangeHandler) extends Logging {
   this.logIdent = "[ZookeeperClient]: "
   private val initializationLock = new ReentrantReadWriteLock()
@@ -49,21 +49,29 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
   waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS)
 
   /**
-    * Take an AsyncRequest and wait for its AsyncResponse.
-    * @param request a single AsyncRequest to wait on.
-    * @return the request's AsyncReponse.
-    */
+   * Take an AsyncRequest and wait for its AsyncResponse. See handle(Seq[AsyncRequest]) for details.
+   *
+   * @param request a single AsyncRequest to wait on.
+   * @return the request's AsyncResponse.
+   */
   def handle(request: AsyncRequest): AsyncResponse = {
     handle(Seq(request)).head
   }
 
   /**
-    * Pipeline a sequence of AsyncRequests and wait for all of their AsyncResponses.
-    * @param requests a sequence of AsyncRequests to wait on.
-    * @return the AsyncResponses.
-    */
+   * Pipeline a sequence of AsyncRequests and wait for all of their AsyncResponses.
+   *
+   * The watch flag on each outgoing request will be set if we've already registered a handler for the
+   * path associated with the AsyncRequest.
+   *
+   * @param requests a sequence of AsyncRequests to wait on.
+   * @return the AsyncResponses.
+   */
   def handle(requests: Seq[AsyncRequest]): Seq[AsyncResponse] = inReadLock(initializationLock) {
     import scala.collection.JavaConverters._
+    if (requests.isEmpty) {
+      return Seq.empty
+    }
     val countDownLatch = new CountDownLatch(requests.size)
     val responseQueue = new ArrayBlockingQueue[AsyncResponse](requests.size)
     requests.foreach {
@@ -113,10 +121,10 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
   }
 
   /**
-    * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
-    * @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
-    * @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection.
-    */
+   * Wait indefinitely until the underlying zookeeper client to reaches the CONNECTED state.
+   * @throws ZookeeperClientAuthFailedException if the authentication failed either before or while waiting for connection.
+   * @throws ZookeeperClientExpiredException if the session expired either before or while waiting for connection.
+   */
   def waitUntilConnected(): Unit = inLock(isConnectedOrExpiredLock) {
     waitUntilConnected(Long.MaxValue, TimeUnit.MILLISECONDS)
   }
@@ -142,30 +150,43 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
     info("Connected.")
   }
 
-  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): ExistsResponse = {
-    registerZNodeChangeHandlers(Seq(zNodeChangeHandler)).head
-  }
-
-  def registerZNodeChangeHandlers(handlers: Seq[ZNodeChangeHandler]): Seq[ExistsResponse] = {
-    handlers.foreach(handler => zNodeChangeHandlers.put(handler.path, handler))
-    val asyncRequests = handlers.map(handler => ExistsRequest(handler.path, null))
-    handle(asyncRequests).asInstanceOf[Seq[ExistsResponse]]
+  /**
+   * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
+   *
+   * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest])
+   * with either a GetDataRequest or ExistsRequest.
+   *
+   * NOTE: zookeeper only allows registration to a nonexistent znode with ExistsRequest.
+   *
+   * @param zNodeChangeHandler the handler to register
+   */
+  def registerZNodeChangeHandler(zNodeChangeHandler: ZNodeChangeHandler): Unit = {
+    zNodeChangeHandlers.put(zNodeChangeHandler.path, zNodeChangeHandler)
   }
 
+  /**
+   * Unregister the handler from ZookeeperClient. This is just a local operation.
+   * @param path the path of the handler to unregister
+   */
   def unregisterZNodeChangeHandler(path: String): Unit = {
     zNodeChangeHandlers.remove(path)
   }
 
-  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): GetChildrenResponse = {
-    registerZNodeChildChangeHandlers(Seq(zNodeChildChangeHandler)).head
-  }
-
-  def registerZNodeChildChangeHandlers(handlers: Seq[ZNodeChildChangeHandler]): Seq[GetChildrenResponse] = {
-    handlers.foreach(handler => zNodeChildChangeHandlers.put(handler.path, handler))
-    val asyncRequests = handlers.map(handler => GetChildrenRequest(handler.path, null))
-    handle(asyncRequests).asInstanceOf[Seq[GetChildrenResponse]]
+  /**
+   * Register the handler to ZookeeperClient. This is just a local operation. This does not actually register a watcher.
+   *
+   * The watcher is only registered once the user calls handle(AsyncRequest) or handle(Seq[AsyncRequest]) with a GetChildrenRequest.
+   *
+   * @param zNodeChildChangeHandler the handler to register
+   */
+  def registerZNodeChildChangeHandler(zNodeChildChangeHandler: ZNodeChildChangeHandler): Unit = {
+    zNodeChildChangeHandlers.put(zNodeChildChangeHandler.path, zNodeChildChangeHandler)
   }
 
+  /**
+   * Unregister the handler from ZookeeperClient. This is just a local operation.
+   * @param path the path of the handler to unregister
+   */
   def unregisterZNodeChildChangeHandler(path: String): Unit = {
     zNodeChildChangeHandlers.remove(path)
   }
@@ -178,6 +199,10 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
     info("Closed.")
   }
 
+  def sessionId = inReadLock(initializationLock) {
+    zooKeeper.getSessionId
+  }
+
   private def initialize(): Unit = {
     if (!zooKeeper.getState.isAlive) {
       info(s"Initializing a new session to $connectString.")
@@ -199,7 +224,7 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
         }
       }
       info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}")
-      stateChangeHandler.onConnectionTimeout
+      stateChangeHandler.onReconnectionTimeout()
     }
   }
 
@@ -212,45 +237,45 @@ class ZookeeperClient(connectString: String, sessionTimeoutMs: Int, connectionTi
         }
         if (event.getState == KeeperState.AuthFailed) {
           info("Auth failed.")
-          stateChangeHandler.onAuthFailure
+          stateChangeHandler.onAuthFailure()
         } else if (event.getState == KeeperState.Expired) {
           inWriteLock(initializationLock) {
             info("Session expired.")
-            stateChangeHandler.beforeInitializingSession
+            stateChangeHandler.beforeInitializingSession()
             initialize()
-            stateChangeHandler.afterInitializingSession
+            stateChangeHandler.afterInitializingSession()
           }
         }
       } else if (event.getType == EventType.NodeCreated) {
-        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation)
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleCreation())
       } else if (event.getType == EventType.NodeDeleted) {
-        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion)
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDeletion())
       } else if (event.getType == EventType.NodeDataChanged) {
-        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange)
+        Option(zNodeChangeHandlers.get(event.getPath)).foreach(_.handleDataChange())
       } else if (event.getType == EventType.NodeChildrenChanged) {
-        Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange)
+        Option(zNodeChildChangeHandlers.get(event.getPath)).foreach(_.handleChildChange())
       }
     }
   }
 }
 
 trait StateChangeHandler {
-  def beforeInitializingSession: Unit
-  def afterInitializingSession: Unit
-  def onAuthFailure: Unit
-  def onConnectionTimeout: Unit
+  def beforeInitializingSession(): Unit = {}
+  def afterInitializingSession(): Unit = {}
+  def onAuthFailure(): Unit = {}
+  def onReconnectionTimeout(): Unit = {}
 }
 
 trait ZNodeChangeHandler {
   val path: String
-  def handleCreation: Unit
-  def handleDeletion: Unit
-  def handleDataChange: Unit
+  def handleCreation(): Unit = {}
+  def handleDeletion(): Unit = {}
+  def handleDataChange(): Unit = {}
 }
 
 trait ZNodeChildChangeHandler {
   val path: String
-  def handleChildChange: Unit
+  def handleChildChange(): Unit = {}
 }
 
 sealed trait AsyncRequest {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index a8c0a4a..739405f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -27,7 +27,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_9_0
 import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
-import kafka.controller.KafkaController
+import kafka.controller.{KafkaController, KafkaControllerZkUtils, StateChangeHandler, ZookeeperClient}
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
 import kafka.log.{LogConfig, LogManager}
@@ -135,6 +135,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   var quotaManagers: QuotaFactory.QuotaManagers = null
 
   var zkUtils: ZkUtils = null
+  var kafkaControllerZkUtils: KafkaControllerZkUtils = null
   val correlationId: AtomicInteger = new AtomicInteger(0)
   val brokerMetaPropsFile = "meta.properties"
   val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap
@@ -233,7 +234,22 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         replicaManager.startup()
 
         /* start kafka controller */
-        kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
+        val zookeeperClient = new ZookeeperClient(config.zkConnect, config.zkSessionTimeoutMs,
+          config.zkConnectionTimeoutMs, new StateChangeHandler {
+            override def onReconnectionTimeout(): Unit = {
+              error("Reconnection timeout.")
+            }
+
+            override def afterInitializingSession(): Unit = kafkaController.newSession()
+
+            override def onAuthFailure(): Unit = {
+              error("Auth failure.")
+            }
+
+            override def beforeInitializingSession(): Unit = kafkaController.expire()
+          })
+        kafkaControllerZkUtils = new KafkaControllerZkUtils(zookeeperClient, zkUtils.isSecure)
+        kafkaController = new KafkaController(config, kafkaControllerZkUtils, time, metrics, threadNamePrefix)
         kafkaController.startup()
 
         adminManager = new AdminManager(config, metrics, metadataCache, zkUtils)
@@ -544,6 +560,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
           CoreUtils.swallow(kafkaController.shutdown())
         if (zkUtils != null)
           CoreUtils.swallow(zkUtils.close())
+        if (kafkaControllerZkUtils != null)
+          CoreUtils.swallow(kafkaControllerZkUtils.close())
 
         if (metrics != null)
           CoreUtils.swallow(metrics.close())

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 1df0916..38e0b66 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -286,11 +286,29 @@ class ZkUtils(zkClientWrap: ZooKeeperClientWrapper,
 
   def getController(): Int = {
     readDataMaybeNull(ControllerPath)._1 match {
-      case Some(controller) => KafkaController.parseControllerId(controller)
+      case Some(controller) => parseControllerId(controller)
       case None => throw new KafkaException("Controller doesn't exist")
     }
   }
 
+  def parseControllerId(controllerInfoString: String): Int = {
+    try {
+      Json.parseFull(controllerInfoString) match {
+        case Some(js) => js.asJsonObject("brokerid").to[Int]
+        case None => throw new KafkaException("Failed to parse the controller info json [%s].".format(controllerInfoString))
+      }
+    } catch {
+      case _: Throwable =>
+        // It may be due to an incompatible controller register version
+        warn("Failed to parse the controller info as json. "
+          + "Probably this controller is still using the old format [%s] to store the broker id in zookeeper".format(controllerInfoString))
+        try controllerInfoString.toInt
+        catch {
+          case t: Throwable => throw new KafkaException("Failed to parse the controller info: " + controllerInfoString + ". This is neither the new or the old format.", t)
+        }
+    }
+  }
+
   /* Represents a cluster identifier. Stored in Zookeeper in JSON format: {"version" -> "1", "id" -> id } */
   object ClusterId {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index d5f2fe0..99cacb3 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -287,7 +287,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(Set(tp))
       leaderIsrAndControllerEpochMap.contains(tp) &&
         isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
-        leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List.empty
+        leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
     }, "failed to get expected partition state after entire isr went offline")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b71ee043/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
new file mode 100644
index 0000000..f149fc9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/PartitionLeaderElectionAlgorithmsTest.scala
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+
+class PartitionLeaderElectionAlgorithmsTest  extends JUnitSuite {
+
+  @Test
+  def testOfflinePartitionLeaderElection(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2, 4)
+    val liveReplicas = Set(4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
+      isr,
+      liveReplicas,
+      uncleanLeaderElectionEnabled = false)
+    assertEquals(Option(4), leaderOpt)
+  }
+
+  @Test
+  def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionDisabled(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2)
+    val liveReplicas = Set(4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
+      isr,
+      liveReplicas,
+      uncleanLeaderElectionEnabled = false)
+    assertEquals(None, leaderOpt)
+  }
+  @Test
+  def testOfflinePartitionLeaderElectionLastIsrOfflineUncleanLeaderElectionEnabled(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2)
+    val liveReplicas = Set(4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment,
+      isr,
+      liveReplicas,
+      uncleanLeaderElectionEnabled = true)
+    assertEquals(Option(4), leaderOpt)
+  }
+
+  @Test
+  def testReassignPartitionLeaderElection(): Unit = {
+    val reassignment = Seq(2, 4)
+    val isr = Seq(2, 4)
+    val liveReplicas = Set(4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(reassignment,
+      isr,
+      liveReplicas,
+      uncleanLeaderElectionEnabled = false)
+    assertEquals(Option(4), leaderOpt)
+  }
+
+  @Test
+  def testReassignPartitionLeaderElectionWithNoLiveIsr(): Unit = {
+    val reassignment = Seq(2, 4)
+    val isr = Seq(2)
+    val liveReplicas = Set.empty[Int]
+    val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
+      isr,
+      liveReplicas)
+    assertEquals(None, leaderOpt)
+  }
+
+  @Test
+  def testReassignPartitionLeaderElectionWithEmptyIsr(): Unit = {
+    val reassignment = Seq(2, 4)
+    val isr = Seq.empty[Int]
+    val liveReplicas = Set(2)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.reassignPartitionLeaderElection(reassignment,
+      isr,
+      liveReplicas)
+    assertEquals(None, leaderOpt)
+  }
+
+  @Test
+  def testPreferredReplicaPartitionLeaderElection(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2, 4)
+    val liveReplicas = Set(2, 4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas)
+    assertEquals(Option(2), leaderOpt)
+  }
+
+  @Test
+  def testPreferredReplicaPartitionLeaderElectionPreferredReplicaInIsrNotLive(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2)
+    val liveReplicas = Set.empty[Int]
+    val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas)
+    assertEquals(None, leaderOpt)
+  }
+
+  @Test
+  def testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrLive(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(4)
+    val liveReplicas = Set(2, 4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas)
+    assertEquals(None, leaderOpt)
+  }
+
+  @Test
+  def testPreferredReplicaPartitionLeaderElectionPreferredReplicaNotInIsrNotLive(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq.empty[Int]
+    val liveReplicas = Set.empty[Int]
+    val leaderOpt = PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas)
+    assertEquals(None, leaderOpt)
+  }
+
+  @Test
+  def testControlledShutdownPartitionLeaderElection(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2, 4)
+    val liveReplicas = Set(2, 4)
+    val shuttingDownBrokers = Set(2)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas,
+      shuttingDownBrokers)
+    assertEquals(Option(4), leaderOpt)
+  }
+
+  @Test
+  def testControlledShutdownPartitionLeaderElectionLastIsrShuttingDown(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2)
+    val liveReplicas = Set(2, 4)
+    val shuttingDownBrokers = Set(2)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas,
+      shuttingDownBrokers)
+    assertEquals(None, leaderOpt)
+  }
+
+  @Test
+  def testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown(): Unit = {
+    val assignment = Seq(2, 4)
+    val isr = Seq(2, 4)
+    val liveReplicas = Set(2, 4)
+    val shuttingDownBrokers = Set(2, 4)
+    val leaderOpt = PartitionLeaderElectionAlgorithms.controlledShutdownPartitionLeaderElection(assignment,
+      isr,
+      liveReplicas,
+      shuttingDownBrokers)
+    assertEquals(None, leaderOpt)
+  }
+}