You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/04/01 16:17:42 UTC

[kafka] branch trunk updated: KAFKA-8030: Fix flaky tests in TopicCommandWithAdminClientTest

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e560bae  KAFKA-8030: Fix flaky tests in TopicCommandWithAdminClientTest
e560bae is described below

commit e560bae22a33c0f39d3347c82d5edb120719c02c
Author: Viktor Somogyi-Vass <vi...@gmail.com>
AuthorDate: Mon Apr 1 21:47:20 2019 +0530

    KAFKA-8030: Fix flaky tests in TopicCommandWithAdminClientTest
    
    This change adds waits for metadata updates after killing the broker in order to make the tests more stable.
    
    Author: Viktor Somogyi-Vass <vi...@gmail.com>
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
    
    Closes #6505 from viktorsomogyi/flaky-min-isr-test
---
 .../admin/TopicCommandWithAdminClientTest.scala    | 37 +++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index e0597de..f963d8f 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -28,6 +28,9 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic, AdminClient => JAdminClient}
 import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Rule, Test}
 import org.junit.rules.TestName
@@ -515,9 +518,35 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
     waitForTopicCreated(testTopicName)
 
     try {
+      // check which partition is on broker 0 which we'll kill
+      val testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
+        .all().get().asScala(testTopicName)
+      val partitionOnBroker0 = testTopicDescription.partitions().asScala.find(_.leader().id() == 0).get.partition()
+
       killBroker(0)
+
+      // wait until the topic metadata for the test topic is propagated to each alive broker
+      TestUtils.waitUntilTrue(() => {
+        servers
+          .filterNot(_.config.brokerId == 0)
+          .foldLeft(true) {
+            (result, server) => {
+              val topicMetadatas = server.dataPlaneRequestProcessor.metadataCache
+                .getTopicMetadata(Set(testTopicName), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+              val testPartitionMetadata = topicMetadatas.find(_.topic().equals(testTopicName)).get.partitionMetadata().asScala.find(_.partition() == partitionOnBroker0)
+              testPartitionMetadata match {
+                case None => fail(s"Partition metadata is not found in metadata cache")
+                case Some(metadata) => {
+                  result && metadata.error() == Errors.LEADER_NOT_AVAILABLE
+                }
+              }
+            }
+          }
+      }, s"Partition metadata for $testTopicName is not propagated")
+
+      // grab the console output and assert
       val output = TestUtils.grabConsoleOutput(
-        topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions"))))
+          topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions"))))
       val rows = output.split("\n")
       assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
       assertTrue(rows(0).endsWith("Leader: none\tReplicas: 0\tIsr: "))
@@ -534,6 +563,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
 
     try {
       killBroker(0)
+      val aliveServers = servers.filterNot(_.config.brokerId == 0)
+      TestUtils.waitUntilMetadataIsPropagated(aliveServers, testTopicName, 0)
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
       val rows = output.split("\n")
@@ -554,6 +585,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
 
     try {
       killBroker(0)
+      val aliveServers = servers.filterNot(_.config.brokerId == 0)
+      TestUtils.waitUntilMetadataIsPropagated(aliveServers, testTopicName, 0)
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
       val rows = output.split("\n")
@@ -596,6 +629,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
 
     try {
       killBroker(0)
+      val aliveServers = servers.filterNot(_.config.brokerId == 0)
+      TestUtils.waitUntilMetadataIsPropagated(aliveServers, underMinIsrTopic, 0)
       val output = TestUtils.grabConsoleOutput(
         topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
       val rows = output.split("\n")