You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/30 12:35:43 UTC

[pulsar] branch master updated: Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73ef162  Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
73ef162 is described below

commit 73ef1621ab0bbecfcb2325453a4d93a406fcba3c
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Nov 30 20:34:45 2021 +0800

    Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
---
 .../PersistentDispatcherMultipleConsumers.java     |  3 +++
 ...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9bf2db9..a240c0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -83,6 +83,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
 
     protected volatile boolean havePendingRead = false;
     protected volatile boolean havePendingReplayRead = false;
+    protected volatile PositionImpl minReplayedPosition = null;
     protected boolean shouldRewindBeforeReadingOrReplaying = false;
     protected final String name;
 
@@ -244,6 +245,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 }
 
                 havePendingReplayRead = true;
+                minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
                 Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
                         ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
                 // clear already acked positions from replay bucket
@@ -267,6 +269,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                             consumerList.size());
                 }
                 havePendingRead = true;
+                minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
                 cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
                         ReadType.Normal, topic.getMaxReadPosition());
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 6f4c4eb..5c8f33e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,6 +170,37 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             return;
         }
 
+        // A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
+        // This may happen when consumer closed. See issue #12885 for details.
+        if (!allowOutOfOrderDelivery) {
+            Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
+            if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
+                PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
+                // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
+                // read, it is possible that this relayPosition should dispatch to consumer first. So in order to
+                // preserver order delivery, we need to discard this read result, and try to trigger a replay read,
+                // that containing "relayPosition", by calling readMoreEntries.
+                if (relayPosition.compareTo(minReplayedPosition) < 0) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
+                                + "read and retry with readMoreEntries.",
+                                name, relayPosition, minReplayedPosition, readType);
+                    }
+                    if (readType == ReadType.Normal) {
+                        entries.forEach(entry -> {
+                            long stickyKeyHash = getStickyKeyHash(entry);
+                            addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
+                            entry.release();
+                        });
+                    } else if (readType == ReadType.Replay) {
+                        entries.forEach(Entry::release);
+                    }
+                    readMoreEntries();
+                    return;
+                }
+            }
+        }
+
         nextStuckConsumers.clear();
 
         final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();