You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/24 23:40:37 UTC

[kafka] branch 3.3 updated: MINOR: Address flakiness in `KRaftClusterTest::testDescribeQuorumRequestToBrokers` (#12738)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 5d6f8d3e371 MINOR: Address flakiness in `KRaftClusterTest::testDescribeQuorumRequestToBrokers` (#12738)
5d6f8d3e371 is described below

commit 5d6f8d3e371e56d1c18cb5db7d39af03b9138ac8
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Mon Oct 17 16:16:26 2022 -0700

    MINOR: Address flakiness in `KRaftClusterTest::testDescribeQuorumRequestToBrokers` (#12738)
    
    We have seen some errors such as the following:
    ```
    org.opentest4j.AssertionFailedError: expected: not equal but was: <OptionalLong.empty>
    Stacktrace
    org.opentest4j.AssertionFailedError: expected: not equal but was: <OptionalLong.empty>
            at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
            at org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276)
            at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265)
            at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260)
            at org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2815)
            at kafka.server.KRaftClusterTest.$anonfun$testDescribeQuorumRequestToBrokers$5(KRaftClusterTest.scala:818)
            at java.util.ArrayList.forEach(ArrayList.java:1259)
            at kafka.server.KRaftClusterTest.testDescribeQuorumRequestToBrokers(KRaftClusterTest.scala:814)
    ```
    The patch changes some of the assertions to wait longer for the condition to be satisfied.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/server/KRaftClusterTest.scala            | 39 ++++++++++++++--------
 1 file changed, 26 insertions(+), 13 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index ff16d03e80d..45ab38ef4f6 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -811,20 +811,33 @@ class KRaftClusterTest {
         assertTrue(cluster.controllers.asScala.keySet.contains(quorumInfo.leaderId),
           s"Leader ID ${quorumInfo.leaderId} was not a controller ID.")
 
-        quorumInfo.voters.forEach { voter =>
-          assertTrue(0 < voter.logEndOffset,
-            s"logEndOffset for voter with ID ${voter.replicaId} was ${voter.logEndOffset}")
-          assertNotEquals(OptionalLong.empty(), voter.lastFetchTimestamp)
-          assertNotEquals(OptionalLong.empty(), voter.lastCaughtUpTimestamp)
-        }
+        val (voters, voterResponseValid) =
+          TestUtils.computeUntilTrue(
+            admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+              .quorumInfo().get().voters()
+          ) { voters => voters.stream
+            .allMatch(voter => (voter.logEndOffset > 0
+              && voter.lastFetchTimestamp() != OptionalLong.empty()
+              && voter.lastCaughtUpTimestamp() != OptionalLong.empty()))
+          }
 
-        assertEquals(cluster.brokers.asScala.keySet, quorumInfo.observers.asScala.map(_.replicaId).toSet)
-        quorumInfo.observers.forEach { observer =>
-          assertTrue(0 < observer.logEndOffset,
-            s"logEndOffset for observer with ID ${observer.replicaId} was ${observer.logEndOffset}")
-          assertNotEquals(OptionalLong.empty(), observer.lastFetchTimestamp)
-          assertNotEquals(OptionalLong.empty(), observer.lastCaughtUpTimestamp)
-        }
+        assertTrue(voterResponseValid, s"At least one voter did not return the expected state within timeout." +
+          s"The responses gathered for all the voters: ${voters.toString}")
+
+        val (observers, observerResponseValid) =
+          TestUtils.computeUntilTrue(
+            admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
+              .quorumInfo().get().observers()
+          ) { observers =>
+            (
+              cluster.brokers.asScala.keySet == observers.asScala.map(_.replicaId).toSet
+                && observers.stream.allMatch(observer => (observer.logEndOffset > 0
+                && observer.lastFetchTimestamp() != OptionalLong.empty()
+                && observer.lastCaughtUpTimestamp() != OptionalLong.empty())))
+          }
+
+        assertTrue(observerResponseValid, s"At least one observer did not return the expected state within timeout." +
+            s"The responses gathered for all the observers: ${observers.toString}")
       } finally {
         admin.close()
       }