You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/18 23:39:51 UTC
kafka git commit: KAFKA-5069; add controller integration tests
Repository: kafka
Updated Branches:
refs/heads/trunk 020ca7903 -> c4e59a338
KAFKA-5069; add controller integration tests
Test the various controller protocols by observing zookeeper and broker state.
Author: Onur Karaman <ok...@linkedin.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #2853 from onurkaraman/KAFKA-5069
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4e59a33
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4e59a33
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4e59a33
Branch: refs/heads/trunk
Commit: c4e59a338a045fc9d9b726ad68641e93582a8642
Parents: 020ca79
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue Apr 18 16:39:47 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 18 16:39:47 2017 -0700
----------------------------------------------------------------------
.../PreferredReplicaLeaderElectionCommand.scala | 3 +-
.../kafka/server/ZookeeperLeaderElector.scala | 8 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 9 +-
.../controller/ControllerIntegrationTest.scala | 317 +++++++++++++++++++
.../test/scala/unit/kafka/utils/TestUtils.scala | 10 +
5 files changed, 340 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 960d526..2078774 100755
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -102,8 +102,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
def writePreferredReplicaElectionData(zkUtils: ZkUtils,
partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
- val partitionsList = partitionsUndergoingPreferredReplicaElection.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
- val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
+ val jsonData = ZkUtils.preferredReplicaLeaderElectionZkData(partitionsUndergoingPreferredReplicaElection)
try {
zkUtils.createPersistentPath(zkPath, jsonData)
println("Created preferred replica election path with %s".format(jsonData))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index f41782e..64a401a 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,7 +17,7 @@
package kafka.server
import kafka.utils.CoreUtils._
-import kafka.utils.{Json, Logging, ZKCheckedEphemeral}
+import kafka.utils.{Json, Logging, ZKCheckedEphemeral, ZkUtils}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
@@ -60,9 +60,9 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
}
def elect: Boolean = {
- val timestamp = time.milliseconds.toString
- val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
-
+ val timestamp = time.milliseconds
+ val electString = ZkUtils.controllerZkData(brokerId, timestamp)
+
leaderId = getControllerID
/*
* We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index bef6454..724414e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -205,6 +205,14 @@ object ZkUtils {
topics
}
+ def controllerZkData(brokerId: Int, timestamp: Long): String = {
+ Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString))
+ }
+
+ def preferredReplicaLeaderElectionZkData(partitions: scala.collection.Set[TopicAndPartition]): String = {
+ Json.encode(Map("version" -> 1, "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition))))
+ }
+
def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = {
Json.encode(Map(
"version" -> 1,
@@ -417,7 +425,6 @@ class ZkUtils(val zkClient: ZkClient,
topicDirs.consumerOwnerDir + "/" + partition
}
-
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = {
Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch,
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr))
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
new file mode 100644
index 0000000..5e608d1
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.utils.{CoreUtils, TestUtils, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.junit.{After, Before, Test}
+
+class ControllerIntegrationTest extends ZooKeeperTestHarness {
+ var servers = Seq.empty[KafkaServer]
+
+ @Before
+ override def setUp() {
+ super.setUp
+ servers = Seq.empty[KafkaServer]
+ }
+
+ @After
+ override def tearDown() {
+ servers.foreach(_.shutdown())
+ servers.foreach(server => CoreUtils.delete(server.config.logDirs))
+ super.tearDown
+ }
+
+ @Test
+ def testEmptyCluster(): Unit = {
+ servers = makeServers(1)
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+ "broker failed to set controller epoch")
+ }
+
+ @Test
+ def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
+ servers = makeServers(1)
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+ "broker failed to set controller epoch")
+ servers.head.shutdown()
+ servers.head.awaitShutdown()
+ TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ControllerPath), "failed to kill controller")
+ TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+ "controller epoch was not persisted after broker failure")
+ }
+
+ @Test
+ def testControllerMoveIncrementsControllerEpoch(): Unit = {
+ servers = makeServers(1)
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
+ "broker failed to set controller epoch")
+ servers.head.shutdown()
+ servers.head.awaitShutdown()
+ servers.head.startup()
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
+ TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch + 1,
+ "controller epoch was not persisted after broker failure")
+ }
+
+ @Test
+ def testTopicCreation(): Unit = {
+ servers = makeServers(1)
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(0))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ }
+
+ @Test
+ def testTopicCreationWithOfflineReplica(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers.take(1))
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ }
+
+ @Test
+ def testTopicPartitionExpansion(): Unit = {
+ servers = makeServers(1)
+ val tp0 = TopicAndPartition("t", 0)
+ val tp1 = TopicAndPartition("t", 1)
+ val assignment = Map(tp0.partition -> Seq(0))
+ val expandedAssignment = Map(tp0.partition -> Seq(0), tp1.partition -> Seq(0))
+ TestUtils.createTopic(zkUtils, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
+ zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+ waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic partition expansion")
+ TestUtils.waitUntilMetadataIsPropagated(servers, tp1.topic, tp1.partition)
+ }
+
+ @Test
+ def testTopicPartitionExpansionWithOfflineReplica(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp0 = TopicAndPartition("t", 0)
+ val tp1 = TopicAndPartition("t", 1)
+ val assignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId))
+ val expandedAssignment = Map(tp0.partition -> Seq(otherBrokerId, controllerId), tp1.partition -> Seq(otherBrokerId, controllerId))
+ TestUtils.createTopic(zkUtils, tp0.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ zkUtils.updatePersistentPath(ZkUtils.getTopicPath(tp0.topic), zkUtils.replicaAssignmentZkData(expandedAssignment.map(kv => kv._1.toString -> kv._2)))
+ waitForPartitionState(tp1, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic partition expansion")
+ TestUtils.waitUntilMetadataIsPropagated(Seq(servers(controllerId)), tp1.topic, tp1.partition)
+ }
+
+ @Test
+ def testPartitionReassignment(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId))
+ val reassignment = Map(tp -> Seq(otherBrokerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 3,
+ "failed to get expected partition state after partition reassignment")
+ TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+ "failed to get updated partition assignment on topic znode after partition reassignment")
+ TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+ "failed to remove reassign partitions path after completion")
+ }
+
+ @Test
+ def testPartitionReassignmentWithOfflineReplicaHaltingProgress(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId))
+ val reassignment = Map(tp -> Seq(otherBrokerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ "failed to get expected partition state during partition reassignment with offline replica")
+ TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+ "partition reassignment path should remain while reassignment in progress")
+ }
+
+ @Test
+ def testPartitionReassignmentResumesAfterReplicaComesOnline(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(controllerId))
+ val reassignment = Map(tp -> Seq(otherBrokerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, ZkUtils.formatAsReassignmentJson(reassignment))
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ "failed to get expected partition state during partition reassignment with offline replica")
+ servers(otherBrokerId).startup()
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 4,
+ "failed to get expected partition state after partition reassignment")
+ TestUtils.waitUntilTrue(() => zkUtils.getReplicaAssignmentForTopics(Seq(tp.topic)) == reassignment,
+ "failed to get updated partition assignment on topic znode after partition reassignment")
+ TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ReassignPartitionsPath),
+ "failed to remove reassign partitions path after completion")
+ }
+
+ @Test
+ def testPreferredReplicaLeaderElection(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ "failed to get expected partition state upon broker shutdown")
+ servers(otherBrokerId).startup()
+ TestUtils.waitUntilTrue(() => servers.forall(_.metadataCache.isBrokerAlive(otherBrokerId)), "broker join was not broadcasted to the cluster")
+ zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
+ TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+ "failed to remove preferred replica leader election path after completion")
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
+ "failed to get expected partition state upon broker startup")
+ }
+
+ @Test
+ def testPreferredReplicaLeaderElectionWithOfflinePreferredReplica(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(otherBrokerId, controllerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ zkUtils.createPersistentPath(ZkUtils.PreferredReplicaLeaderElectionPath, ZkUtils.preferredReplicaLeaderElectionZkData(Set(tp)))
+ TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.PreferredReplicaLeaderElectionPath),
+ "failed to remove preferred replica leader election path after giving up")
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ "failed to get expected partition state upon broker shutdown")
+ }
+
+ @Test
+ def testAutoPreferredReplicaLeaderElection(): Unit = {
+ servers = makeServers(2, autoLeaderRebalanceEnable = true)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(1, 0))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch + 1,
+ "failed to get expected partition state upon broker shutdown")
+ servers(otherBrokerId).startup()
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch + 2,
+ "failed to get expected partition state upon broker startup")
+ }
+
+ @Test
+ def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
+ servers = makeServers(2)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(otherBrokerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ servers(otherBrokerId).shutdown()
+ servers(otherBrokerId).awaitShutdown()
+ TestUtils.waitUntilTrue(() => {
+ val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+ leaderIsrAndControllerEpochMap.contains(tp) &&
+ isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
+ leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List(otherBrokerId)
+ }, "failed to get expected partition state after entire isr went offline")
+ }
+
+ @Test
+ def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled(): Unit = {
+ servers = makeServers(2, uncleanLeaderElectionEnable = true)
+ val controllerId = TestUtils.waitUntilControllerElected(zkUtils)
+ val otherBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
+ val tp = TopicAndPartition("t", 0)
+ val assignment = Map(tp.partition -> Seq(otherBrokerId))
+ TestUtils.createTopic(zkUtils, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
+ waitForPartitionState(tp, KafkaController.InitialControllerEpoch, otherBrokerId, LeaderAndIsr.initialLeaderEpoch,
+ "failed to get expected partition state upon topic creation")
+ servers(1).shutdown()
+ servers(1).awaitShutdown()
+ TestUtils.waitUntilTrue(() => {
+ val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+ leaderIsrAndControllerEpochMap.contains(tp) &&
+ isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), KafkaController.InitialControllerEpoch, LeaderAndIsr.NoLeader, LeaderAndIsr.initialLeaderEpoch + 1) &&
+ leaderIsrAndControllerEpochMap(tp).leaderAndIsr.isr == List.empty
+ }, "failed to get expected partition state after entire isr went offline")
+ }
+
+ private def waitForPartitionState(tp: TopicAndPartition,
+ controllerEpoch: Int,
+ leader: Int,
+ leaderEpoch: Int,
+ message: String): Unit = {
+ TestUtils.waitUntilTrue(() => {
+ val leaderIsrAndControllerEpochMap = zkUtils.getPartitionLeaderAndIsrForTopics(null, Set(tp))
+ leaderIsrAndControllerEpochMap.contains(tp) &&
+ isExpectedPartitionState(leaderIsrAndControllerEpochMap(tp), controllerEpoch, leader, leaderEpoch)
+ }, message)
+ }
+
+ private def isExpectedPartitionState(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
+ controllerEpoch: Int,
+ leader: Int,
+ leaderEpoch: Int) =
+ leaderIsrAndControllerEpoch.controllerEpoch == controllerEpoch &&
+ leaderIsrAndControllerEpoch.leaderAndIsr.leader == leader &&
+ leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch == leaderEpoch
+
+ private def makeServers(numConfigs: Int, autoLeaderRebalanceEnable: Boolean = false, uncleanLeaderElectionEnable: Boolean = false) = {
+ val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect)
+ configs.foreach { config =>
+ config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
+ config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
+ config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1")
+ }
+ configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/c4e59a33/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index cd9f0b1..214fc39 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -55,6 +55,7 @@ import org.junit.Assert._
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.util.Try
/**
* Utility functions to help with testing
@@ -851,6 +852,15 @@ object TestUtils extends Logging {
leader
}
+ def waitUntilControllerElected(zkUtils: ZkUtils, timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Int = {
+ var controllerIdTry: Try[Int] = null
+ TestUtils.waitUntilTrue(() => {
+ controllerIdTry = Try { zkUtils.getController() }
+ controllerIdTry.isSuccess
+ }, s"Controller not elected after $timeout ms", waitTime = timeout)
+ controllerIdTry.get
+ }
+
def waitUntilLeaderIsKnown(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
val tp = new TopicPartition(topic, partition)