You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/27 13:31:09 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #11687: KAFKA-13592:Fix flaky test ControllerIntegrationTest.testTopicIdUpgradeAfterReassigningPartitions

divijvaidya commented on code in PR #11687:
URL: https://github.com/apache/kafka/pull/11687#discussion_r883617842


##########
core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala:
##########
@@ -1221,59 +1221,58 @@ class ControllerIntegrationTest extends QuorumTestHarness {
     adminZkClient.createTopic(tp.topic, 1, 1)
     waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
-    val id = servers.head.kafkaController.controllerContext.topicIds.get(tp.topic)
+    val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)
     assertTrue(topicIdAfterCreate.isEmpty)
-    assertEquals(topicIdAfterCreate, id,
+    assertEquals(topicIdAfterCreate, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected no topic ID, but one existed")
 
     // Upgrade to IBP 2.8
-    servers(0).shutdown()
-    servers(0).awaitShutdown()
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
     servers = makeServers(1)
     waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon controller restart")
-    val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val (topicIdAfterUpgrade, _) = TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
     assertEquals(topicIdAfterUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected same topic ID but it can not be found")
-    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+    assertEquals(tp.topic, servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
       "correct topic name expected but cannot be found in the controller context")
 
     // Downgrade back to 2.7
-    servers(0).shutdown()
-    servers(0).awaitShutdown()
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
     servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
     waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
       "failed to get expected partition state upon topic creation")
-    val topicIdAfterDowngrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val (topicIdAfterDowngrade, _) = TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)
     assertTrue(topicIdAfterDowngrade.isDefined)
     assertEquals(topicIdAfterUpgrade, topicIdAfterDowngrade,
       "expected same topic ID but it can not be found after downgrade")
     assertEquals(topicIdAfterDowngrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected same topic ID in controller context but it is no longer found after downgrade")
-    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+    assertEquals(tp.topic, servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
       "correct topic name expected but cannot be found in the controller context")
 
     // Reassign partitions
-    servers(0).kafkaController.eventManager.put(ApiPartitionReassignment(reassignment, _ => ()))
+    servers.head.kafkaController.eventManager.put(ApiPartitionReassignment(reassignment, _ => ()))
     waitForPartitionState(tp, 3, 0, 1,
       "failed to get expected partition state upon controller restart")
-    val topicIdAfterReassignment = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val topicIdAfterReassignment = zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)
     assertTrue(topicIdAfterReassignment.isDefined)
     assertEquals(topicIdAfterUpgrade, topicIdAfterReassignment,
       "expected same topic ID but it can not be found after reassignment")
     assertEquals(topicIdAfterUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
       "expected same topic ID in controller context but is no longer found after reassignment")
-    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+    assertEquals(tp.topic, servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
       "correct topic name expected but cannot be found in the controller context")
 
     // Upgrade back to 2.8
-    servers(0).shutdown()
-    servers(0).awaitShutdown()
+    servers.head.shutdown()
+    servers.head.awaitShutdown()
     servers = makeServers(1)
     waitForPartitionState(tp, 3, 0, 1,
       "failed to get expected partition state upon controller restart")
-    val topicIdAfterReUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val (topicIdAfterReUpgrade, _) = TestUtils.computeUntilTrue(zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic))(_.nonEmpty)

Review Comment:
   A similar change is required for other tests in this file such as in `testTopicIdMigrationAndHandling`. We will need this computeUntilTrue whenever we are bringing up a new controller. Please update the other tests as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org