You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/30 12:35:43 UTC
[pulsar] branch master updated: Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 73ef162 Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
73ef162 is described below
commit 73ef1621ab0bbecfcb2325453a4d93a406fcba3c
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Tue Nov 30 20:34:45 2021 +0800
Fix Issue #12885, Unordered consuming case in Key_Shared subscription (#12890)
---
.../PersistentDispatcherMultipleConsumers.java | 3 +++
...istentStickyKeyDispatcherMultipleConsumers.java | 31 ++++++++++++++++++++++
2 files changed, 34 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 9bf2db9..a240c0b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -83,6 +83,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected volatile boolean havePendingRead = false;
protected volatile boolean havePendingReplayRead = false;
+ protected volatile PositionImpl minReplayedPosition = null;
protected boolean shouldRewindBeforeReadingOrReplaying = false;
protected final String name;
@@ -244,6 +245,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
havePendingReplayRead = true;
+ minReplayedPosition = messagesToReplayNow.stream().min(PositionImpl::compareTo).orElse(null);
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
@@ -267,6 +269,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
consumerList.size());
}
havePendingRead = true;
+ minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 6f4c4eb..5c8f33e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -170,6 +170,37 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return;
}
+ // A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
+ // This may happen when consumer closed. See issue #12885 for details.
+ if (!allowOutOfOrderDelivery) {
+ Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
+ if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
+ PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
+ // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
+ // read, it is possible that this relayPosition should dispatch to consumer first. So in order to
+ // preserver order delivery, we need to discard this read result, and try to trigger a replay read,
+ // that containing "relayPosition", by calling readMoreEntries.
+ if (relayPosition.compareTo(minReplayedPosition) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
+ + "read and retry with readMoreEntries.",
+ name, relayPosition, minReplayedPosition, readType);
+ }
+ if (readType == ReadType.Normal) {
+ entries.forEach(entry -> {
+ long stickyKeyHash = getStickyKeyHash(entry);
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
+ entry.release();
+ });
+ } else if (readType == ReadType.Replay) {
+ entries.forEach(Entry::release);
+ }
+ readMoreEntries();
+ return;
+ }
+ }
+ }
+
nextStuckConsumers.clear();
final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();