You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/15 02:36:37 UTC

[pulsar] 06/15: [fix][flaky-test] PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#16493)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3fc367d52b58f1a14fd6f433060d236aff269676
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jul 13 00:29:58 2022 +0800

    [fix][flaky-test] PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#16493)
    
    (cherry picked from commit 8a54fd944e4418f619954f09f5552ecd962bebf9)
---
 .../broker/service/PersistentFailoverE2ETest.java  | 31 +++++++++++++---------
 1 file changed, 19 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index c10742c59e6..9fe83f7b11d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -184,8 +185,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
 
         rolloverPerIntervalStats();
 
-        assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
-        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
+        });
 
         // 3. consumer1 should have all the messages while consumer2 should have no messages
         Message<byte[]> msg = null;
@@ -200,8 +202,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
         rolloverPerIntervalStats();
 
         // 4. messages deleted on individual acks
-        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
-        assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+        });
 
         for (int i = 0; i < numMsgs; i++) {
             String message = "my-message-" + i;
@@ -224,10 +227,12 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
             // do not ack
         }
         consumer1.close();
-        Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
 
-        verifyConsumerActive(listener2, -1);
-        verifyConsumerNotReceiveAnyStateChanges(listener1);
+        Awaitility.await().untilAsserted(() -> {
+            verifyConsumerActive(listener2, -1);
+            verifyConsumerNotReceiveAnyStateChanges(listener1);
+        });
+
         for (int i = 5; i < numMsgs; i++) {
             msg = consumer2.receive(1, TimeUnit.SECONDS);
             Assert.assertNotNull(msg);
@@ -237,8 +242,10 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
         Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
 
         rolloverPerIntervalStats();
-        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
-        assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+
+        });
 
         // 8. unsubscribe not allowed if multiple consumers connected
         try {
@@ -257,9 +264,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
             fail("Should not fail", e);
         }
 
-        Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
-        subRef = topicRef.getSubscription(subName);
-        assertNull(subRef);
+        Awaitility.await().untilAsserted(() -> {
+            assertNull(topicRef.getSubscription(subName));
+        });
 
         producer.close();
         consumer2.close();