You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/05/02 18:42:40 UTC

[pulsar] branch master updated: Fixed key-shared delivery of messages with interleaved delays (#15409)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2647b3254b2 Fixed key-shared delivery of messages with interleaved delays (#15409)
2647b3254b2 is described below

commit 2647b3254b2cfec471761de4ae49d0c7eadaaedb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 2 11:42:26 2022 -0700

    Fixed key-shared delivery of messages with interleaved delays (#15409)
    
    * Fixed key-shared delivery of messages with interleaved delays
    
    * Fixed checkstyle
    
    * Always add to redelivery messages
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  5 ++-
 .../PersistentDispatcherMultipleConsumers.java     |  6 ++-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 51 ++++++++++++----------
 .../service/persistent/DelayedDeliveryTest.java    | 44 +++++++++++++++++++
 4 files changed, 81 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 80ec2185e56..d0eec0098b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -134,7 +134,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
 
     @Override
     public void resetTickTime(long tickTime) {
-        if (this.tickTimeMillis != tickTime){
+
+        if (this.tickTimeMillis != tickTime) {
             this.tickTimeMillis = tickTime;
         }
     }
@@ -199,7 +200,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
 
         synchronized (dispatcher) {
             currentTimeoutTarget = -1;
-            timeout = null;
+            this.timeout = null;
             dispatcher.readMoreEntries();
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index fbfb09b24a5..517b08a4bd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -270,7 +270,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
-                minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
+                Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
+                minReplayedPosition = toReplay.stream().findFirst().orElse(null);
+                if (minReplayedPosition != null) {
+                    redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
+                }
                 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ed19bdc1358..da46bf449fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -174,29 +174,36 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         // This may happen when consumer closed. See issue #12885 for details.
         if (!allowOutOfOrderDelivery) {
             Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
-            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
-                PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
-                // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
-                // read, it is possible that this relayPosition should dispatch to consumer first. So in order to
-                // preserver order delivery, we need to discard this read result, and try to trigger a replay read,
-                // that containing "relayPosition", by calling readMoreEntries.
-                if (relayPosition.compareTo(minReplayedPosition) < 0) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
-                                + "read and retry with readMoreEntries.",
-                                name, relayPosition, minReplayedPosition, readType);
-                    }
-                    if (readType == ReadType.Normal) {
-                        entries.forEach(entry -> {
-                            long stickyKeyHash = getStickyKeyHash(entry);
-                            addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
-                            entry.release();
-                        });
-                    } else if (readType == ReadType.Replay) {
-                        entries.forEach(Entry::release);
+            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
+                PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
+                // We have received a message potentially from the delayed tracker and, since we're not using it
+                // right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
+                // resend it (until we disconnect consumer).
+                redeliveryMessages.add(replayPosition.getLedgerId(), replayPosition.getEntryId());
+
+                if (this.minReplayedPosition != null) {
+                    // If relayPosition is a new entry wither smaller position is inserted for redelivery during this
+                    // async read, it is possible that this relayPosition should dispatch to consumer first. So in
+                    // order to preserver order delivery, we need to discard this read result, and try to trigger a
+                    // replay read, that containing "relayPosition", by calling readMoreEntries.
+                    if (replayPosition.compareTo(minReplayedPosition) < 0) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, "
+                                            + "discard this read and retry with readMoreEntries.",
+                                    name, replayPosition, minReplayedPosition, readType);
+                        }
+                        if (readType == ReadType.Normal) {
+                            entries.forEach(entry -> {
+                                long stickyKeyHash = getStickyKeyHash(entry);
+                                addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
+                                entry.release();
+                            });
+                        } else if (readType == ReadType.Replay) {
+                            entries.forEach(Entry::release);
+                        }
+                        readMoreEntries();
+                        return;
                     }
-                    readMoreEntries();
-                    return;
                 }
             }
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 013f5ac6bc6..45131761824 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -538,4 +539,47 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
 
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
     }
+
+    @Test
+    public void testInterleavedMessagesOnKeySharedSubscription() throws Exception {
+        String topic = BrokerTestUtil.newUniqueName("testInterleavedMessagesOnKeySharedSubscription");
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("key-shared-sub")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        Random random = new Random(0);
+        for (int i = 0; i < 10; i++) {
+            // Publish 1 message without delay and 1 with delay
+            producer.newMessage()
+                    .value("immediate-msg-" + i)
+                    .sendAsync();
+
+            int delayMillis = 1000 + random.nextInt(1000);
+            producer.newMessage()
+                    .value("delayed-msg-" + i)
+                    .deliverAfter(delayMillis, TimeUnit.MILLISECONDS)
+                    .sendAsync();
+            Thread.sleep(1000);
+        }
+
+        producer.flush();
+
+        Set<String> receivedMessages = new HashSet<>();
+
+        while (receivedMessages.size() < 20) {
+            Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+            receivedMessages.add(msg.getValue());
+            consumer.acknowledge(msg);
+        }
+    }
+
 }