You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2023/05/29 22:42:24 UTC
[pulsar] branch master updated: [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3rd attempt (#20429)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a26cf3eb788 [fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3rd attempt (#20429)
a26cf3eb788 is described below
commit a26cf3eb78858ef5638650b6dbd7dab1f76a8c61
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Tue May 30 01:42:16 2023 +0300
[fix][test] Fix flaky test NonPersistentTopicTest.testMsgDropStat, 3rd attempt (#20429)
---
.../apache/pulsar/client/api/NonPersistentTopicTest.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 63ce0f00dff..4f64c4271fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -821,7 +822,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
int defaultNonPersistentMessageRate = conf.getMaxConcurrentNonPersistentMessagePerConnection();
try {
- final String topicName = "non-persistent://my-property/my-ns/stats-topic";
+ final String topicName = BrokerTestUtil.newUniqueName("non-persistent://my-property/my-ns/stats-topic");
// restart broker with lower publish rate limit
conf.setMaxConcurrentNonPersistentMessagePerConnection(1);
stopBroker();
@@ -829,12 +830,15 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
pulsar.getBrokerService().updateRates();
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-1")
.receiverQueueSize(1).subscribe();
+ @Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("subscriber-2")
.receiverQueueSize(1).subscriptionType(SubscriptionType.Shared).subscribe();
+ @Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
@@ -861,12 +865,12 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
});
});
}
- latch.await(5, TimeUnit.SECONDS);
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
NonPersistentTopic topic =
(NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
- Awaitility.await().untilAsserted(() -> {
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
pulsar.getBrokerService().updateRates();
NonPersistentTopicStats stats = topic.getStats(false, false, false);
NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
@@ -877,9 +881,6 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
assertTrue(sub2Stats.getMsgDropRate() > 0);
});
- producer.close();
- consumer.close();
- consumer2.close();
} finally {
conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate);
}