You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/05/24 16:35:24 UTC

[pulsar] branch master updated: [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat (#20387)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 903425be3af [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat (#20387)
903425be3af is described below

commit 903425be3afbbcfeb0a7213b0b7d04afe5868151
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed May 24 19:35:15 2023 +0300

    [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat (#20387)
---
 .../pulsar/client/api/NonPersistentTopicTest.java  | 29 ++++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 85274064964..c41ab3e8ccc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -64,6 +64,7 @@ import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.apache.pulsar.zookeeper.ZookeeperServerTest;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -824,10 +825,12 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             stopBroker();
             startBroker();
             Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
-                    .receiverQueueSize(1).subscribe();
+                    .receiverQueueSize(1)
+                    .messageListener((c, msg) -> {}).subscribe();
 
             Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
-                    .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
+                    .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared)
+                    .messageListener((c, msg) -> {}).subscribe();
 
             ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
@@ -848,15 +851,19 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             }
             latch.await();
 
-            NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
-            pulsar.getBrokerService().updateRates();
-            NonPersistentTopicStats stats = topic.getStats(false, false, false);
-            NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
-            NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
-            NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
-            assertTrue(npStats.getMsgDropRate() > 0);
-            assertTrue(sub1Stats.getMsgDropRate() > 0);
-            assertTrue(sub2Stats.getMsgDropRate() > 0);
+            NonPersistentTopic topic =
+                    (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+            Awaitility.await().untilAsserted(() -> {
+                pulsar.getBrokerService().updateRates();
+                NonPersistentTopicStats stats = topic.getStats(false, false, false);
+                NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
+                NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
+                NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
+                assertTrue(npStats.getMsgDropRate() > 0);
+                assertTrue(sub1Stats.getMsgDropRate() > 0);
+                assertTrue(sub2Stats.getMsgDropRate() > 0);
+            });
 
             producer.close();
             consumer.close();