You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/05/02 18:42:40 UTC
[pulsar] branch master updated: Fixed key-shared delivery of messages with interleaved delays (#15409)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2647b3254b2 Fixed key-shared delivery of messages with interleaved delays (#15409)
2647b3254b2 is described below
commit 2647b3254b2cfec471761de4ae49d0c7eadaaedb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon May 2 11:42:26 2022 -0700
Fixed key-shared delivery of messages with interleaved delays (#15409)
* Fixed key-shared delivery of messages with interleaved delays
* Fixed checkstyle
* Always add to redelivery messages
---
.../delayed/InMemoryDelayedDeliveryTracker.java | 5 ++-
.../PersistentDispatcherMultipleConsumers.java | 6 ++-
...istentStickyKeyDispatcherMultipleConsumers.java | 51 ++++++++++++----------
.../service/persistent/DelayedDeliveryTest.java | 44 +++++++++++++++++++
4 files changed, 81 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 80ec2185e56..d0eec0098b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -134,7 +134,8 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
@Override
public void resetTickTime(long tickTime) {
- if (this.tickTimeMillis != tickTime){
+
+ if (this.tickTimeMillis != tickTime) {
this.tickTimeMillis = tickTime;
}
}
@@ -199,7 +200,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
synchronized (dispatcher) {
currentTimeoutTarget = -1;
- timeout = null;
+ this.timeout = null;
dispatcher.readMoreEntries();
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index fbfb09b24a5..517b08a4bd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -270,7 +270,11 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
consumerList.size());
}
havePendingRead = true;
- minReplayedPosition = getMessagesToReplayNow(1).stream().findFirst().orElse(null);
+ Set<PositionImpl> toReplay = getMessagesToReplayNow(1);
+ minReplayedPosition = toReplay.stream().findFirst().orElse(null);
+ if (minReplayedPosition != null) {
+ redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
+ }
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ed19bdc1358..da46bf449fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -174,29 +174,36 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
Set<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
- if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty() && this.minReplayedPosition != null) {
- PositionImpl relayPosition = messagesToReplayNow.stream().findFirst().get();
- // If relayPosition is a new entry wither smaller position is inserted for redelivery during this async
- // read, it is possible that this relayPosition should dispatch to consumer first. So in order to
- // preserver order delivery, we need to discard this read result, and try to trigger a replay read,
- // that containing "relayPosition", by calling readMoreEntries.
- if (relayPosition.compareTo(minReplayedPosition) < 0) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, discard this "
- + "read and retry with readMoreEntries.",
- name, relayPosition, minReplayedPosition, readType);
- }
- if (readType == ReadType.Normal) {
- entries.forEach(entry -> {
- long stickyKeyHash = getStickyKeyHash(entry);
- addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
- entry.release();
- });
- } else if (readType == ReadType.Replay) {
- entries.forEach(Entry::release);
+ if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
+ PositionImpl replayPosition = messagesToReplayNow.stream().findFirst().get();
+ // We have received a message potentially from the delayed tracker and, since we're not using it
+ // right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
+ // resend it (until we disconnect consumer).
+ redeliveryMessages.add(replayPosition.getLedgerId(), replayPosition.getEntryId());
+
+ if (this.minReplayedPosition != null) {
+ // If relayPosition is a new entry wither smaller position is inserted for redelivery during this
+ // async read, it is possible that this relayPosition should dispatch to consumer first. So in
+ // order to preserver order delivery, we need to discard this read result, and try to trigger a
+ // replay read, that containing "relayPosition", by calling readMoreEntries.
+ if (replayPosition.compareTo(minReplayedPosition) < 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, "
+ + "discard this read and retry with readMoreEntries.",
+ name, replayPosition, minReplayedPosition, readType);
+ }
+ if (readType == ReadType.Normal) {
+ entries.forEach(entry -> {
+ long stickyKeyHash = getStickyKeyHash(entry);
+ addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
+ entry.release();
+ });
+ } else if (readType == ReadType.Replay) {
+ entries.forEach(Entry::release);
+ }
+ readMoreEntries();
+ return;
}
- readMoreEntries();
- return;
}
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 013f5ac6bc6..45131761824 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
@@ -538,4 +539,47 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
Awaitility.await().untilAsserted(() -> Assert.assertEquals(dispatcher.getNumberOfDelayedMessages(), 0));
}
+
+ @Test
+ public void testInterleavedMessagesOnKeySharedSubscription() throws Exception {
+ String topic = BrokerTestUtil.newUniqueName("testInterleavedMessagesOnKeySharedSubscription");
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("key-shared-sub")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ Random random = new Random(0);
+ for (int i = 0; i < 10; i++) {
+ // Publish 1 message without delay and 1 with delay
+ producer.newMessage()
+ .value("immediate-msg-" + i)
+ .sendAsync();
+
+ int delayMillis = 1000 + random.nextInt(1000);
+ producer.newMessage()
+ .value("delayed-msg-" + i)
+ .deliverAfter(delayMillis, TimeUnit.MILLISECONDS)
+ .sendAsync();
+ Thread.sleep(1000);
+ }
+
+ producer.flush();
+
+ Set<String> receivedMessages = new HashSet<>();
+
+ while (receivedMessages.size() < 20) {
+ Message<String> msg = consumer.receive(3, TimeUnit.SECONDS);
+ receivedMessages.add(msg.getValue());
+ consumer.acknowledge(msg);
+ }
+ }
+
}