You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/18 23:39:51 UTC

kafka git commit: KAFKA-5069; add controller integration tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 020ca7903 -> c4e59a338


KAFKA-5069; add controller integration tests

Test the various controller protocols by observing zookeeper and broker state.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2853 from onurkaraman/KAFKA-5069


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4e59a33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4e59a33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4e59a33

Branch: refs/heads/trunk
Commit: c4e59a338a045fc9d9b726ad68641e93582a8642
Parents: 020ca79
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue Apr 18 16:39:47 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 18 16:39:47 2017 -0700

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala |   3 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |   8 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   9 +-
 .../controller/ControllerIntegrationTest.scala  | 317 +++++++++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |  10 +
 5 files changed, 340 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 960d526..2078774 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -102,8 +102,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
   def writePreferredReplicaElectionData(zkUtils: ZkUtils,
                                         partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
     val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
-    val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
-    val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+    val jsonData = ZkUtils.preferredReplicaLeaderElectionZkData(partitionsUndergoingPreferredReplicaElection)
     try {
       zkUtils.createPersistentPath(zkPath, jsonData)
       println("Created preferred replica election path with %s".format(jsonData))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index f41782e..64a401a 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,7 +17,7 @@
 package kafka.server
 
 import kafka.utils.CoreUtils._
-import kafka.utils.{Json, Logging, ZKCheckedEphemeral}
+import kafka.utils.{Json, Logging, ZKCheckedEphemeral, ZkUtils}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
@@ -60,9 +60,9 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
   }
 
   def elect: Boolean = {
-    val timestamp = time.milliseconds.toString
-    val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
-   
+    val timestamp = time.milliseconds
+    val electString = ZkUtils.controllerZkData(brokerId, timestamp)
+
    leaderId = getControllerID 
     /* 
      * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition, 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index bef6454..724414e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -205,6 +205,14 @@ object ZkUtils {
     topics
   }
 
+  def controllerZkData(brokerId: Int, timestamp: Long): String = {
+    Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
+  }
+
+  def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = {
+    Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))))
+  }
+
   def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
     Json.encode(Map(
       "version" -> 1,
@@ -417,7 +425,6 @@ class ZkUtils(val zkClient: ZkClient,
     topicDirs.consumerOwnerDir + "/" + partition
   }
 
-
   def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
     Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
                     "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
new file mode 100644
index 0000000..5e608d1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.{After, Before, Test}
+
+class ControllerIntegrationTest extends ZooKeeperTestHarness {
+  var servers = Seq.empty[KafkaServer]
+
+  @Before
+  override def setUp() {
+    super.setUp
+    servers = Seq.empty[KafkaServer]
+  }
+
+  @After
+  override def tearDown() {
+    servers.foreach(_.shutdown())
+    servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+    super.tearDown
+  }
+
+  @Test
+  def testEmptyCluster(): Unit = {
+    servers = makeServers(1)
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+      "broker failed to set controller epoch")
+  }
+
+  @Test
+  def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
+    servers = makeServers(1)
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+      "broker failed to set controller epoch")
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ControllerPath), "failed to kill controller")
+    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+      "controller epoch was not persisted after broker failure")
+  }
+
+  @Test
+  def testControllerMoveIncrementsControllerEpoch(): Unit = {
+    servers = makeServers(1)
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+      "broker failed to set controller epoch")
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
+    servers.head.startup()
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch + 1,
+      "controller epoch was not persisted after broker failure")
+  }
+
+  @Test
+  def testTopicCreation(): Unit = {
+    servers = makeServers(1)
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(0))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+  }
+
+  @Test
+  def testTopicCreationWithOfflineReplica(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1))
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+  }
+
+  @Test
+  def testTopicPartitionExpansion(): Unit = {
+    servers = makeServers(1)
+    val tp0 = TopicAndPartition("t", 0)
+    val tp1 = TopicAndPartition("t", 1)
+    val assignment = Map(tp0.partition -> Seq(0))
+    val expandedAssignment = Map(tp0.partition -> Seq(0), tp1.partition -> Seq(0))
+    TestUtils.createTopic(zkUtils, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
+    zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+    waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic partition expansion")
+    TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
+  }
+
+  @Test
+  def testTopicPartitionExpansionWithOfflineReplica(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp0 = TopicAndPartition("t", 0)
+    val tp1 = TopicAndPartition("t", 1)
+    val assignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId))
+    val expandedAssignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId), tp1.partition -> Seq(otherBrokerId, controllerId))
+    TestUtils.createTopic(zkUtils, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+    waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic partition expansion")
+    TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition)
+  }
+
+  @Test
+  def testPartitionReassignment(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    val reassignment = Map(tp -> Seq(otherBrokerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
+      "failed to get expected partition state after partition reassignment")
+    TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+      "failed to get updated partition assignment on topic znode after partition reassignment")
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+      "failed to remove reassign partitions path after completion")
+  }
+
+  @Test
+  def testPartitionReassignmentWithOfflineReplicaHaltingProgress(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    val reassignment = Map(tp -> Seq(otherBrokerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+    TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+      "partition reassignment path should remain while reassignment in progress")
+  }
+
+  @Test
+  def testPartitionReassignmentResumesAfterReplicaComesOnline(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(controllerId))
+    val reassignment = Map(tp -> Seq(otherBrokerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+      "failed to get expected partition state during partition reassignment with offline replica")
+    servers(otherBrokerId).startup()
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
+      "failed to get expected partition state after partition reassignment")
+    TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+      "failed to get updated partition assignment on topic znode after partition reassignment")
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+      "failed to remove reassign partitions path after completion")
+  }
+
+  @Test
+  def testPreferredReplicaLeaderElection(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+    servers(otherBrokerId).startup()
+    TestUtils.waitUntilTrue(() => servers.forall(_.metadataCache.isBrokerAlive(otherBrokerId)), "broker join was not broadcasted to the cluster")
+    zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+      "failed to remove preferred replica leader election path after completion")
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+  }
+
+  @Test
+  def testPreferredReplicaLeaderElectionWithOfflinePreferredReplica(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
+    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+      "failed to remove preferred replica leader election path after giving up")
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+  }
+
+  @Test
+  def testAutoPreferredReplicaLeaderElection(): Unit = {
+    servers = makeServers(2, autoLeaderRebalanceEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(1, 0))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+      "failed to get expected partition state upon broker shutdown")
+    servers(otherBrokerId).startup()
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
+      "failed to get expected partition state upon broker startup")
+  }
+
+  @Test
+  def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
+    servers = makeServers(2)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(otherBrokerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+    servers(otherBrokerId).shutdown()
+    servers(otherBrokerId).awaitShutdown()
+    TestUtils.waitUntilTrue(() => {
+      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+      leaderIsrAndControllerEpochMap.contains(tp) &&
+        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
+        leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
+    }, "failed to get expected partition state after entire isr went offline")
+  }
+
+  @Test
+  def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled(): Unit = {
+    servers = makeServers(2, uncleanLeaderElectionEnable = true)
+    val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+    val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+    val tp = TopicAndPartition("t", 0)
+    val assignment = Map(tp.partition -> Seq(otherBrokerId))
+    TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+    waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+    servers(1).shutdown()
+    servers(1).awaitShutdown()
+    TestUtils.waitUntilTrue(() => {
+      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+      leaderIsrAndControllerEpochMap.contains(tp) &&
+        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
+        leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List.empty
+    }, "failed to get expected partition state after entire isr went offline")
+  }
+
+  private def waitForPartitionState(tp: TopicAndPartition,
+                                    controllerEpoch: Int,
+                                    leader: Int,
+                                    leaderEpoch: Int,
+                                    message: String): Unit = {
+    TestUtils.waitUntilTrue(() => {
+      val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+      leaderIsrAndControllerEpochMap.contains(tp) &&
+        isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), controllerEpoch, leader, leaderEpoch)
+    }, message)
+  }
+
+  private def isExpectedPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+                                       controllerEpoch: Int,
+                                       leader: Int,
+                                       leaderEpoch: Int) =
+    leaderIsrAndControllerEpoch.controllerEpoch == controllerEpoch &&
+      leaderIsrAndControllerEpoch.leaderAndIsr.leader == leader &&
+      leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch == leaderEpoch
+
+  private def makeServers(numConfigs: Int, autoLeaderRebalanceEnable: Boolean = false, uncleanLeaderElectionEnable: Boolean = false) = {
+    val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect)
+    configs.foreach { config =>
+      config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
+      config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
+      config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1")
+    }
+    configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cd9f0b1..214fc39 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -55,6 +55,7 @@ import org.junit.Assert._
 import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.util.Try
 
 /**
  * Utility functions to help with testing
@@ -851,6 +852,15 @@ object TestUtils extends Logging {
     leader
   }
 
+  def waitUntilControllerElected(zkUtils: ZkUtils, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+    var controllerIdTry: Try[Int] = null
+    TestUtils.waitUntilTrue(() => {
+      controllerIdTry = Try { zkUtils.getController() }
+      controllerIdTry.isSuccess
+    }, s"Controller not elected after $timeout ms", waitTime = timeout)
+    controllerIdTry.get
+  }
+
   def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int,
                              timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
     val tp = new TopicPartition(topic, partition)