You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/05/29 22:42:24 UTC

[pulsar] branch master updated: [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3rd attempt (#20429)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a26cf3eb788 [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3rd attempt (#20429)
a26cf3eb788 is described below

commit a26cf3eb78858ef5638650b6dbd7dab1f76a8c61
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue May 30 01:42:16 2023 +0300

    [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3rd attempt (#20429)
---
 .../apache/pulsar/client/api/NonPersistentTopicTest.java    | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 63ce0f00dff..4f64c4271fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -821,7 +822,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
         int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
         try {
-            final String topicName = "non-persistent://my-property/my-ns/stats-topic";
+            final String topicName = BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
             // restart broker with lower publish rate limit
             conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
             stopBroker();
@@ -829,12 +830,15 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
             pulsar.getBrokerService().updateRates();
 
+            @Cleanup
             Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
                     .receiverQueueSize(1).subscribe();
 
+            @Cleanup
             Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
                     .receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
 
+            @Cleanup
             ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
                 .enableBatching(false)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -861,12 +865,12 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
                     });
                 });
             }
-            latch.await(5, TimeUnit.SECONDS);
+            assertTrue(latch.await(5, TimeUnit.SECONDS));
 
             NonPersistentTopic topic =
                     (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
 
-            Awaitility.await().untilAsserted(() -> {
+            Awaitility.await().ignoreExceptions().untilAsserted(() -> {
                 pulsar.getBrokerService().updateRates();
                 NonPersistentTopicStats stats = topic.getStats(false, false, false);
                 NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
@@ -877,9 +881,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
                 assertTrue(sub2Stats.getMsgDropRate() > 0);
             });
 
-            producer.close();
-            consumer.close();
-            consumer2.close();
         } finally {
             conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
         }