You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/21 19:39:23 UTC
[pulsar] branch branch-2.9 updated: [fix][broker] Fixed duplicated delayed messages when all consumers disconnect (#14740)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 2624a04 [fix][broker] Fixed duplicated delayed messages when all consumers disconnect (#14740)
2624a04 is described below
commit 2624a0407fb76a9280034b2c4b99147d70946356
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 21 09:10:51 2022 -0700
[fix][broker] Fixed duplicated delayed messages when all consumers disconnect (#14740)
---
.../PersistentDispatcherMultipleConsumers.java | 1 +
.../service/persistent/DelayedDeliveryTest.java | 46 ++++++++++++++++++++++
2 files changed, 47 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a8a36cd..782c597 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -147,6 +147,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
shouldRewindBeforeReadingOrReplaying = false;
}
redeliveryMessages.clear();
+ delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
}
if (isConsumersExceededOnSubscription()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index cb870f8..480da2f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -493,4 +493,50 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
admin.topics().skipAllMessages(topic, subName);
Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
}
+
+ @Test
+ public void testDelayedDeliveryWithAllConsumersDisconnecting() throws Exception {
+ String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelays");
+
+ Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ producer.newMessage()
+ .value("msg")
+ .deliverAfter(5, TimeUnit.SECONDS)
+ .send();
+
+ Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription("sub").getDispatcher();
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1));
+
+ c1.close();
+
+ // Attach a new consumer. Since there are no consumers connected, this will trigger the cursor rewind
+ @Cleanup
+ Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .receiverQueueSize(1)
+ .subscribe();
+
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1));
+
+ Message<String> msg = c2.receive(10, TimeUnit.SECONDS);
+ assertNotNull(msg);
+
+ // No more messages
+ msg = c2.receive(1, TimeUnit.SECONDS);
+ assertNull(msg);
+
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
+ }
}