You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/07 06:55:23 UTC
[pulsar] branch master updated: [Client] Add test to ensure the message order in listener callbacks (#15049)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c2c05c49aff [Client] Add test to ensure the message order in listener callbacks (#15049)
c2c05c49aff is described below
commit c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Apr 7 14:55:16 2022 +0800
[Client] Add test to ensure the message order in listener callbacks (#15049)
---
.../client/api/SimpleProducerConsumerTest.java | 30 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 9 +++++--
.../apache/pulsar/client/impl/ConsumerImpl.java | 6 -----
.../client/impl/MultiTopicsConsumerImpl.java | 4 +--
.../pulsar/client/impl/ZeroQueueConsumerImpl.java | 2 +-
5 files changed, 39 insertions(+), 12 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 6eed808f3e4..7506a9bf3f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -58,6 +58,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
@@ -4400,4 +4401,33 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(invocationCount = 5)
+ public void testListenerOrdering() throws Exception {
+ final String topic = "persistent://my-property/my-ns/test-listener-ordering-" + System.currentTimeMillis();
+ final int numMessages = 1000;
+ final CountDownLatch latch = new CountDownLatch(numMessages);
+ final List<String> values = new CopyOnWriteArrayList<>();
+ final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub")
+ .messageListener((MessageListener<String>) (consumer1, msg) -> {
+ values.add(msg.getValue());
+ latch.countDown();
+ })
+ .subscribe();
+ final Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic)
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("msg-" + i);
+ }
+ latch.await(3, TimeUnit.SECONDS);
+ producer.close();
+ consumer.close();
+ assertEquals(values.size(), numMessages);
+ for (int i = 0; i < numMessages; i++) {
+ assertEquals(values.get(i), "msg-" + i);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 98dc43a1ebc..6215562160a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -955,8 +955,13 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
}
- protected void triggerListener() {
- // Use internalPinnedExecutor to maintain message ordering
+ protected void tryTriggerListener() {
+ if (listener != null) {
+ triggerListener();
+ }
+ }
+
+ private void triggerListener() {
internalPinnedExecutor.execute(() -> {
try {
// Listener should only have one pending/running executable to process a message
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 74bda5e4e91..e666dae63b7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1343,12 +1343,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
- private void tryTriggerListener() {
- if (listener != null) {
- triggerListener();
- }
- }
-
private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 57fc2b07327..21e0b9941ce 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -305,9 +305,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
notifyPendingBatchReceivedCallBack();
}
- if (listener != null) {
- triggerListener();
- }
+ tryTriggerListener();
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 5c7b85b93ec..9f0f2e215a5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -171,7 +171,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
}
@Override
- protected void triggerListener() {
+ protected void tryTriggerListener() {
// Ignore since it was already triggered in the triggerZeroQueueSizeListener() call
}