You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/03/21 19:41:50 UTC

[pulsar] branch branch-2.8 updated: [fix][broker] Fixed duplicated delayed messages when all consumers disconnect (#14740)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new db3ad55  [fix][broker] Fixed duplicated delayed messages when all consumers disconnect (#14740)
db3ad55 is described below

commit db3ad550456f078a1520203e0da985031a053263
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Mar 21 09:10:51 2022 -0700

    [fix][broker] Fixed duplicated delayed messages when all consumers disconnect (#14740)
---
 .../PersistentDispatcherMultipleConsumers.java     |  1 +
 .../service/persistent/DelayedDeliveryTest.java    | 46 ++++++++++++++++++++++
 2 files changed, 47 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 7ec6f4d..189ef9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -146,6 +146,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 shouldRewindBeforeReadingOrReplaying = false;
             }
             redeliveryMessages.clear();
+            delayedDeliveryTracker.ifPresent(DelayedDeliveryTracker::clear);
         }
 
         if (isConsumersExceededOnSubscription()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index cb870f8..480da2f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -493,4 +493,50 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         admin.topics().skipAllMessages(topic, subName);
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
     }
+
+    @Test
+    public void testDelayedDeliveryWithAllConsumersDisconnecting() throws Exception {
+        String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelays");
+
+        Consumer<String> c1 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.newMessage()
+                    .value("msg")
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .send();
+
+        Dispatcher dispatcher = pulsar.getBrokerService().getTopicReference(topic).get().getSubscription("sub").getDispatcher();
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1));
+
+        c1.close();
+
+        // Attach a new consumer. Since there are no consumers connected, this will trigger the cursor rewind
+        @Cleanup
+        Consumer<String> c2 = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .receiverQueueSize(1)
+                .subscribe();
+
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 1));
+
+        Message<String> msg = c2.receive(10, TimeUnit.SECONDS);
+        assertNotNull(msg);
+
+        // No more messages
+        msg = c2.receive(1, TimeUnit.SECONDS);
+        assertNull(msg);
+
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
+    }
 }