You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/13 02:24:04 UTC

kafka git commit: KAFKA-5180; Fix transient failure in ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch

Repository: kafka
Updated Branches:
  refs/heads/trunk f21f8f2d4 -> 1cb39f757


KAFKA-5180; Fix transient failure in ControllerIntegrationTest.testControllerMoveIncrementsControllerEpoch

The tests previously ignored the fact that the controller does not atomically
create the /controller znode and create/increment the /controller_epoch znode.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3038 from onurkaraman/KAFKA-5180


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1cb39f75
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1cb39f75
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1cb39f75

Branch: refs/heads/trunk
Commit: 1cb39f7570b8dad3e8b5c7643fb1cbbcf382bb31
Parents: f21f8f2
Author: Onur Karaman <ok...@linkedin.com>
Authored: Sat May 13 03:23:30 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat May 13 03:23:38 2017 +0100

----------------------------------------------------------------------
 .../controller/ControllerIntegrationTest.scala   | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1cb39f75/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index cbb98e8..ec3a153 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -43,35 +43,30 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   def testEmptyCluster(): Unit = {
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
-    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
-      "broker failed to set controller epoch")
+    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
   }
 
   @Test
   def testControllerEpochPersistsWhenAllBrokersDown(): Unit = {
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
-    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
-      "broker failed to set controller epoch")
+    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
     servers.head.shutdown()
     servers.head.awaitShutdown()
     TestUtils.waitUntilTrue(() => !zkUtils.pathExists(ZkUtils.ControllerPath), "failed to kill controller")
-    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
-      "controller epoch was not persisted after broker failure")
+    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "controller epoch was not persisted after broker failure")
   }
 
   @Test
   def testControllerMoveIncrementsControllerEpoch(): Unit = {
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
-    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch,
-      "broker failed to set controller epoch")
+    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch, "broker failed to set controller epoch")
     servers.head.shutdown()
     servers.head.awaitShutdown()
     servers.head.startup()
     TestUtils.waitUntilTrue(() => zkUtils.pathExists(ZkUtils.ControllerPath), "failed to elect a controller")
-    TestUtils.waitUntilTrue(() => zkUtils.readData(ZkUtils.ControllerEpochPath)._1.toInt == KafkaController.InitialControllerEpoch + 1,
-      "controller epoch was not persisted after broker failure")
+    waitUntilControllerEpoch(KafkaController.InitialControllerEpoch + 1, "controller epoch was not incremented after controller move")
   }
 
   @Test
@@ -284,6 +279,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     }, "failed to get expected partition state after entire isr went offline")
   }
 
+  private def waitUntilControllerEpoch(epoch: Int, message: String): Unit = {
+    TestUtils.waitUntilTrue(() => zkUtils.readDataMaybeNull(ZkUtils.ControllerEpochPath)._1.map(_.toInt) == Some(epoch), message)
+  }
+
   private def waitForPartitionState(tp: TopicAndPartition,
                                     controllerEpoch: Int,
                                     leader: Int,