You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/24 23:40:37 UTC
[kafka] branch 3.3 updated: MINOR: Address flakiness in `KRaftClusterTest::testDescribeQuorumRequestToBrokers` (#12738)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 5d6f8d3e371 MINOR: Address flakiness in `KRaftClusterTest::testDescribeQuorumRequestToBrokers` (#12738)
5d6f8d3e371 is described below
commit 5d6f8d3e371e56d1c18cb5db7d39af03b9138ac8
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Mon Oct 17 16:16:26 2022 -0700
MINOR: Address flakiness in `KRaftClusterTest::testDescribeQuorumRequestToBrokers` (#12738)
We have seen some errors such as the following:
```
org.opentest4j.AssertionFailedError: expected: not equal but was: <OptionalLong.empty>
Stacktrace
org.opentest4j.AssertionFailedError: expected: not equal but was: <OptionalLong.empty>
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
at org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276)
at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265)
at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260)
at org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2815)
at kafka.server.KRaftClusterTest.$anonfun$testDescribeQuorumRequestToBrokers$5(KRaftClusterTest.scala:818)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at kafka.server.KRaftClusterTest.testDescribeQuorumRequestToBrokers(KRaftClusterTest.scala:814)
```
The patch changes some of the assertions to wait longer for the condition to be satisfied.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/server/KRaftClusterTest.scala | 39 ++++++++++++++--------
1 file changed, 26 insertions(+), 13 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index ff16d03e80d..45ab38ef4f6 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -811,20 +811,33 @@ class KRaftClusterTest {
assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
- quorumInfo.voters.forEach { voter =>
- assertTrue(0 < voter.logEndOffset,
- s"logEndOffset for voter with ID ${voter.replicaId} was ${voter.logEndOffset}")
- assertNotEquals(OptionalLong.empty(), voter.lastFetchTimestamp)
- assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimestamp)
- }
+ val (voters, voterResponseValid) =
+ TestUtils.computeUntilTrue(
+ admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+ .quorumInfo().get().voters()
+ ) { voters => voters.stream
+ .allMatch(voter => (voter.logEndOffset > 0
+ && voter.lastFetchTimestamp() != OptionalLong.empty()
+ && voter.lastCaughtUpTimestamp() != OptionalLong.empty()))
+ }
- assertEquals(cluster.brokers.asScala.keySet, quorumInfo.observers.asScala.map(_.replicaId).toSet)
- quorumInfo.observers.forEach { observer =>
- assertTrue(0 < observer.logEndOffset,
- s"logEndOffset for observer with ID ${observer.replicaId} was ${observer.logEndOffset}")
- assertNotEquals(OptionalLong.empty(), observer.lastFetchTimestamp)
- assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimestamp)
- }
+ assertTrue(voterResponseValid, s"At least one voter did not return the expected state within timeout." +
+ s"The responses gathered for all the voters: ${voters.toString}")
+
+ val (observers, observerResponseValid) =
+ TestUtils.computeUntilTrue(
+ admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+ .quorumInfo().get().observers()
+ ) { observers =>
+ (
+ cluster.brokers.asScala.keySet == observers.asScala.map(_.replicaId).toSet
+ && observers.stream.allMatch(observer => (observer.logEndOffset > 0
+ && observer.lastFetchTimestamp() != OptionalLong.empty()
+ && observer.lastCaughtUpTimestamp() != OptionalLong.empty())))
+ }
+
+ assertTrue(observerResponseValid, s"At least one observer did not return the expected state within timeout." +
+ s"The responses gathered for all the observers: ${observers.toString}")
} finally {
admin.close()
}