You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/01/21 17:02:07 UTC

[kafka] branch 3.1 updated: KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 6301954  KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)
6301954 is described below

commit 630195458ccab46536d79838edbf26fc6deb3c91
Author: David Jacot <dj...@confluent.io>
AuthorDate: Fri Jan 21 17:38:40 2022 +0100

    KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade` (#11666)
    
    The issue is that when `zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)` is called after the new controller is brought up, there is not guarantee that the controller has already written the topic id to the topic znode.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../controller/ControllerIntegrationTest.scala      | 21 ++++++++++-----------
 1 file changed, 10 insertions(+), 11 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 2302007..300db00 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1127,27 +1127,26 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
     waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-    assertEquals(None, topicIdAfterCreate)
-    val emptyTopicId = controller.controllerContext.topicIds.get("t")
-    assertEquals(None, emptyTopicId)
+    assertEquals(None, zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))
+    assertEquals(None, controller.controllerContext.topicIds.get(tp.topic))
 
     servers(controllerId).shutdown()
     servers(controllerId).awaitShutdown()
     servers = makeServers(1)
     TestUtils.waitUntilTrue(() => zkClient.getControllerId.isDefined, "failed to elect a controller")
-    val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-    assertNotEquals(emptyTopicId, topicIdAfterUpgrade)
+
+    val (topicIdAfterUpgrade, _) = TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
+    assertNotEquals(None, topicIdAfterUpgrade, s"topic id for ${tp.topic} not found in ZK")
+
     val controller2 = getController().kafkaController
-    assertNotEquals(emptyTopicId, controller2.controllerContext.topicIds.get("t"))
-    val topicId = controller2.controllerContext.topicIds.get("t").get
-    assertEquals(topicIdAfterUpgrade.get, topicId)
-    assertEquals("t", controller2.controllerContext.topicNames(topicId))
+    val topicId = controller2.controllerContext.topicIds.get(tp.topic)
+    assertEquals(topicIdAfterUpgrade, topicId)
+    assertEquals(tp.topic, controller2.controllerContext.topicNames(topicId.get))
 
     TestUtils.waitUntilTrue(() => servers(0).logManager.getLog(tp).isDefined, "log was not created")
 
     val topicIdInLog = servers(0).logManager.getLog(tp).get.topicId
-    assertEquals(Some(topicId), topicIdInLog)
+    assertEquals(topicId, topicIdInLog)
 
     adminZkClient.deleteTopic(tp.topic)
     TestUtils.waitUntilTrue(() => !servers.head.kafkaController.controllerContext.allTopics.contains(tp.topic),