You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2019/04/01 16:17:42 UTC
[kafka] branch trunk updated: KAFKA-8030: Fix flaky tests in
TopicCommandWithAdminClientTest
This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e560bae KAFKA-8030: Fix flaky tests in TopicCommandWithAdminClientTest
e560bae is described below
commit e560bae22a33c0f39d3347c82d5edb120719c02c
Author: Viktor Somogyi-Vass <vi...@gmail.com>
AuthorDate: Mon Apr 1 21:47:20 2019 +0530
KAFKA-8030: Fix flaky tests in TopicCommandWithAdminClientTest
This change adds waits for metadata updates after killing the broker in order to make the tests more stable.
Author: Viktor Somogyi-Vass <vi...@gmail.com>
Reviewers: Manikumar Reddy <ma...@gmail.com>
Closes #6505 from viktorsomogyi/flaky-min-isr-test
---
.../admin/TopicCommandWithAdminClientTest.scala | 37 +++++++++++++++++++++-
1 file changed, 36 insertions(+), 1 deletion(-)
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
index e0597de..f963d8f 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
@@ -28,6 +28,9 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{ListTopicsOptions, NewTopic, AdminClient => JAdminClient}
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.{After, Before, Rule, Test}
import org.junit.rules.TestName
@@ -515,9 +518,35 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
waitForTopicCreated(testTopicName)
try {
+ // check which partition is on broker 0 which we'll kill
+ val testTopicDescription = adminClient.describeTopics(Collections.singletonList(testTopicName))
+ .all().get().asScala(testTopicName)
+ val partitionOnBroker0 = testTopicDescription.partitions().asScala.find(_.leader().id() == 0).get.partition()
+
killBroker(0)
+
+ // wait until the topic metadata for the test topic is propagated to each alive broker
+ TestUtils.waitUntilTrue(() => {
+ servers
+ .filterNot(_.config.brokerId == 0)
+ .foldLeft(true) {
+ (result, server) => {
+ val topicMetadatas = server.dataPlaneRequestProcessor.metadataCache
+ .getTopicMetadata(Set(testTopicName), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val testPartitionMetadata = topicMetadatas.find(_.topic().equals(testTopicName)).get.partitionMetadata().asScala.find(_.partition() == partitionOnBroker0)
+ testPartitionMetadata match {
+ case None => fail(s"Partition metadata is not found in metadata cache")
+ case Some(metadata) => {
+ result && metadata.error() == Errors.LEADER_NOT_AVAILABLE
+ }
+ }
+ }
+ }
+ }, s"Partition metadata for $testTopicName is not propagated")
+
+ // grab the console output and assert
val output = TestUtils.grabConsoleOutput(
- topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions"))))
+ topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName, "--unavailable-partitions"))))
val rows = output.split("\n")
assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName"))
assertTrue(rows(0).endsWith("Leader: none\tReplicas: 0\tIsr: "))
@@ -534,6 +563,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
try {
killBroker(0)
+ val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ TestUtils.waitUntilMetadataIsPropagated(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
val rows = output.split("\n")
@@ -554,6 +585,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
try {
killBroker(0)
+ val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ TestUtils.waitUntilMetadataIsPropagated(aliveServers, testTopicName, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split("\n")
@@ -596,6 +629,8 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
try {
killBroker(0)
+ val aliveServers = servers.filterNot(_.config.brokerId == 0)
+ TestUtils.waitUntilMetadataIsPropagated(aliveServers, underMinIsrTopic, 0)
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split("\n")