You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/13 02:24:04 UTC
kafka git commit: KAFKA-5180; Fix transient failure in
ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
Repository: kafka
Updated Branches:
refs/heads/trunk f21f8f2d4 -> 1cb39f757
KAFKA-5180; Fix transient failure in ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch
The tests previously ignored the fact that the controller does not atomically
create the /controller znode and create/increment the /controller_epoch znode.
Author: Onur Karaman <ok...@linkedin.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3038 from onurkaraman/KAFKA-5180
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cb39f75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cb39f75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cb39f75
Branch: refs/heads/trunk
Commit: 1cb39f7570b8dad3e8b5c7643fb1cbbcf382bb31
Parents: f21f8f2
Author: Onur Karaman <ok...@linkedin.com>
Authored: Sat May 13 03:23:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 13 03:23:38 2017 +0100
----------------------------------------------------------------------
.../controller/ControllerIntegrationTest.scala | 19 +++++++++----------
1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1cb39f75/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index cbb98e8..ec3a153 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -43,35 +43,30 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
def testEmptyCluster(): Unit = {
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
- TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
- "broker failed to set controller epoch")
+ waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
}
@Test
def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
- TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
- "broker failed to set controller epoch")
+ waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
servers.head.shutdown()
servers.head.awaitShutdown()
TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ControllerPath), "failed to kill controller")
- TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
- "controller epoch was not persisted after broker failure")
+ waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "controller epoch was not persisted after broker failure")
}
@Test
def testControllerMoveIncrementsControllerEpoch(): Unit = {
servers = makeServers(1)
TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
- TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
- "broker failed to set controller epoch")
+ waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
servers.head.shutdown()
servers.head.awaitShutdown()
servers.head.startup()
TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
- TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch + 1,
- "controller epoch was not persisted after broker failure")
+ waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1, "controller epoch was not incremented after controller move")
}
@Test
@@ -284,6 +279,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
}, "failed to get expected partition state after entire isr went offline")
}
+ private def waitUntilControllerEpoch(epoch: Int, message: String): Unit = {
+ TestUtils.waitUntilTrue(() => zkUtils.readDataMaybeNull(ZkUtils.ControllerEpochPath)._1.map(_.toInt) == Some(epoch), message)
+ }
+
private def waitForPartitionState(tp: TopicAndPartition,
controllerEpoch: Int,
leader: Int,