You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/05/21 17:33:53 UTC
[kafka] branch trunk updated: MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6ba10ef9c MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189)
f6ba10ef9c is described below
commit f6ba10ef9c2c2d94473efd2fd596b172fcff494a
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Sat May 21 19:33:44 2022 +0200
MINOR: Fix flaky test TopicCommandIntegrationTest.testDescribeAtMinIsrPartitions(String).quorum=kraft (#12189)
Flaky test as failed in CI https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12184/1/tests/
The test fails because it does not wait for metadata to be propagated across brokers before killing a broker which may lead to it getting stale information. Note that a similar test was done in #12104 for a different test.
Reviewers: Kvicii Y, Ziming Deng, Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/admin/TopicCommandIntegrationTest.scala | 36 +++++++++++++---------
.../kafka/integration/KafkaServerTestHarness.scala | 6 +++-
.../kafka/server/DeleteTopicsRequestTest.scala | 8 ++---
3 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
index 26c60e1c3e..3082babd06 100644
--- a/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/TopicCommandIntegrationTest.scala
@@ -586,16 +586,10 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-
if (isKRaftTest()) {
- TestUtils.ensureConsistentKRaftMetadata(
- aliveServers,
- controllerServer,
- "Timeout waiting for partition metadata propagating to brokers"
- )
+ ensureConsistentKRaftMetadata()
} else {
- TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+ TestUtils.waitForPartitionMetadata(aliveBrokers, testTopicName, 0)
}
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-replicated-partitions"))))
@@ -618,8 +612,14 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = brokers.filterNot(_.config.brokerId == 0)
- TestUtils.waitForPartitionMetadata(aliveServers, testTopicName, 0)
+ if (isKRaftTest()) {
+ ensureConsistentKRaftMetadata()
+ } else {
+ TestUtils.waitUntilTrue(
+ () => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 5),
+ s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic"
+ )
+ }
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--under-min-isr-partitions"))))
val rows = output.split("\n")
@@ -697,6 +697,16 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
killBroker(1)
+
+ if (isKRaftTest()) {
+ ensureConsistentKRaftMetadata()
+ } else {
+ TestUtils.waitUntilTrue(
+ () => aliveBrokers.forall(_.metadataCache.getPartitionInfo(testTopicName, 0).get.isr().size() == 4),
+ s"Timeout waiting for partition metadata propagating to brokers for $testTopicName topic"
+ )
+ }
+
val output = TestUtils.grabConsoleOutput(
topicService.describeTopic(new TopicCommandOptions(Array("--at-min-isr-partitions"))))
val rows = output.split("\n")
@@ -741,13 +751,11 @@ class TopicCommandIntegrationTest extends KafkaServerTestHarness with Logging wi
try {
killBroker(0)
- val aliveServers = brokers.filterNot(_.config.brokerId == 0)
-
if (isKRaftTest()) {
- TestUtils.ensureConsistentKRaftMetadata(aliveServers, controllerServer, "Timeout waiting for topic configs propagating to brokers")
+ ensureConsistentKRaftMetadata()
} else {
TestUtils.waitUntilTrue(
- () => aliveServers.forall(
+ () => aliveBrokers.forall(
broker =>
broker.metadataCache.getPartitionInfo(underMinIsrTopic, 0).get.isr().size() < 6 &&
broker.metadataCache.getPartitionInfo(offlineTopic, 0).get.leader() == MetadataResponse.NO_LEADER_ID),
diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index f46713337a..26f4c9d4c2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -353,10 +353,14 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness {
}
}
+ def aliveBrokers: Seq[KafkaBroker] = {
+ _brokers.filter(broker => alive(broker.config.brokerId)).toSeq
+ }
+
def ensureConsistentKRaftMetadata(): Unit = {
if (isKRaftTest()) {
TestUtils.ensureConsistentKRaftMetadata(
- brokers,
+ aliveBrokers,
controllerServer
)
}
diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 9137558437..644f21ff3f 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -71,9 +71,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}")
- if (isKRaftTest()) {
- TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
- }
+ ensureConsistentKRaftMetadata()
request.data.topicNames.forEach { topic =>
validateTopicIsDeleted(topic)
@@ -85,9 +83,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging {
val error = response.errorCounts.asScala.find(_._1 != Errors.NONE)
assertTrue(error.isEmpty, s"There should be no errors, found ${response.data.responses.asScala}")
- if (isKRaftTest()) {
- TestUtils.ensureConsistentKRaftMetadata(brokers, controllerServer)
- }
+ ensureConsistentKRaftMetadata()
response.data.responses.forEach { response =>
validateTopicIsDeleted(response.name())