You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/05 03:09:42 UTC
[pulsar] branch master updated: Replay delayed messages in order.
(#7731)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 79df097 Replay delayed messages in order. (#7731)
79df097 is described below
commit 79df097d18ed29ba0c9a35e07437ba98ae218555
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Aug 5 11:09:23 2020 +0800
Replay delayed messages in order. (#7731)
### Motivation
Replay delayed messages in order.
### Verifying this change
A new unit test added.
---
.../PersistentDispatcherMultipleConsumers.java | 6 ++-
.../service/persistent/DelayedDeliveryTest.java | 44 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index c24e762..b8607f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -336,7 +336,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
havePendingReplayRead = true;
- Set<? extends Position> deletedMessages = asyncReplayEntries(messagesToReplayNow);
+ Set<? extends Position> deletedMessages = topic.delayedDeliveryEnabled ?
+ asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket
deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(),
@@ -372,6 +373,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return cursor.asyncReplayEntries(positions, this, ReadType.Replay);
}
+ protected Set<? extends Position> asyncReplayEntriesInOrder(Set<? extends Position> positions) {
+ return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
+ }
@Override
public boolean isConsumerConnected() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 3cd2e25..82f6241 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -22,6 +22,8 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -32,6 +34,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.annotations.AfterClass;
@@ -271,4 +274,45 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
}
t.interrupt();
}
+
+ @Test
+ public void testOrderingDispatch() throws PulsarClientException {
+ String topic = "persistent://public/default/testOrderingDispatch-" + System.nanoTime();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("shared-sub")
+ .subscriptionType(SubscriptionType.Shared)
+ .subscribe();
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+
+ final int N = 1000;
+
+ for (int i = 0; i < N; i++) {
+ producer.newMessage()
+ .value("msg-" + i)
+ .deliverAfter(5, TimeUnit.SECONDS)
+ .send();
+ }
+
+ List<Message<String>> receives = new ArrayList<>(N);
+ for (int i = 0; i < N; i++) {
+ Message<String> received = consumer.receive();
+ receives.add(received);
+ consumer.acknowledge(received);
+ }
+
+ assertEquals(receives.size(), N);
+
+ for (int i = 0; i < N; i++) {
+ if (i < N - 1) {
+ assertTrue(receives.get(i).getMessageId().compareTo(receives.get(i + 1).getMessageId()) < 0);
+ }
+ }
+ }
}