You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/05 03:09:42 UTC

[pulsar] branch master updated: Replay delayed messages in order. (#7731)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79df097  Replay delayed messages in order. (#7731)
79df097 is described below

commit 79df097d18ed29ba0c9a35e07437ba98ae218555
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Aug 5 11:09:23 2020 +0800

    Replay delayed messages in order. (#7731)
    
    ### Motivation
    
    Replay delayed messages in order.
    
    ### Verifying this change
    
    A new unit test added.
---
 .../PersistentDispatcherMultipleConsumers.java     |  6 ++-
 .../service/persistent/DelayedDeliveryTest.java    | 44 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c24e762..b8607f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -336,7 +336,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
 
                 havePendingReplayRead = true;
-                Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow);
+                Set<? extends Position> deletedMessages = topic.delayedDeliveryEnabled ?
+                        asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
 
                 deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
@@ -372,6 +373,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
     }
 
+    protected Set<? extends Position> asyncReplayEntriesInOrder(Set<? extends Position> positions) {
+        return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
+    }
 
     @Override
     public boolean isConsumerConnected() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 3cd2e25..82f6241 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
@@ -32,6 +34,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.testng.annotations.AfterClass;
@@ -271,4 +274,45 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         }
         t.interrupt();
     }
+
+    @Test
+    public void testOrderingDispatch() throws PulsarClientException {
+        String topic = "persistent://public/default/testOrderingDispatch-" + System.nanoTime();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        final int N = 1000;
+
+        for (int i = 0; i < N; i++) {
+            producer.newMessage()
+                    .value("msg-" + i)
+                    .deliverAfter(5, TimeUnit.SECONDS)
+                    .send();
+        }
+
+        List<Message<String>> receives = new ArrayList<>(N);
+        for (int i = 0; i < N; i++) {
+            Message<String> received = consumer.receive();
+            receives.add(received);
+            consumer.acknowledge(received);
+        }
+
+        assertEquals(receives.size(), N);
+
+        for (int i = 0; i < N; i++) {
+            if (i < N - 1) {
+                assertTrue(receives.get(i).getMessageId().compareTo(receives.get(i + 1).getMessageId()) < 0);
+            }
+        }
+    }
 }