You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/15 02:36:37 UTC
[pulsar] 06/15: [fix][flaky-test] PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#16493)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3fc367d52b58f1a14fd6f433060d236aff269676
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Jul 13 00:29:58 2022 +0800
[fix][flaky-test] PersistentFailoverE2ETest.testSimpleConsumerEventsWithPartition (#16493)
(cherry picked from commit 8a54fd944e4418f619954f09f5552ecd962bebf9)
---
.../broker/service/PersistentFailoverE2ETest.java | 31 +++++++++++++---------
1 file changed, 19 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
index c10742c59e6..9fe83f7b11d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -184,8 +185,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
rolloverPerIntervalStats();
- assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
- Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
+ });
// 3. consumer1 should have all the messages while consumer2 should have no messages
Message<byte[]> msg = null;
@@ -200,8 +202,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
rolloverPerIntervalStats();
// 4. messages deleted on individual acks
- Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+ });
for (int i = 0; i < numMsgs; i++) {
String message = "my-message-" + i;
@@ -224,10 +227,12 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
// do not ack
}
consumer1.close();
- Thread.sleep(CONSUMER_ADD_OR_REMOVE_WAIT_TIME);
- verifyConsumerActive(listener2, -1);
- verifyConsumerNotReceiveAnyStateChanges(listener1);
+ Awaitility.await().untilAsserted(() -> {
+ verifyConsumerActive(listener2, -1);
+ verifyConsumerNotReceiveAnyStateChanges(listener1);
+ });
+
for (int i = 5; i < numMsgs; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
Assert.assertNotNull(msg);
@@ -237,8 +242,10 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
Assert.assertNull(consumer2.receive(100, TimeUnit.MILLISECONDS));
rolloverPerIntervalStats();
- Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);
+
+ });
// 8. unsubscribe not allowed if multiple consumers connected
try {
@@ -257,9 +264,9 @@ public class PersistentFailoverE2ETest extends BrokerTestBase {
fail("Should not fail", e);
}
- Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
- subRef = topicRef.getSubscription(subName);
- assertNull(subRef);
+ Awaitility.await().untilAsserted(() -> {
+ assertNull(topicRef.getSubscription(subName));
+ });
producer.close();
consumer2.close();