You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/22 11:53:14 UTC
[pulsar] 05/07: [Java Client] Improve consumer listener logic (#13273)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e26afd99c9f91887eb63d9d05463f8e41c9a2548
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Dec 21 15:17:21 2021 -0600
[Java Client] Improve consumer listener logic (#13273)
* [Java Client] Improve consumer listener logic
* Move isListenerHandlingMessage update to before submitting to executor
(cherry picked from commit 9f46c4af5e0dad7f9223add57842591822c3dea3)
---
.../org/apache/pulsar/client/impl/ConsumerBase.java | 17 +++++++++--------
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 3 +--
2 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0f47208..20baf47 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -85,7 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
protected final Lock reentrantLock = new ReentrantLock();
- private final AtomicInteger executorQueueSize = new AtomicInteger(0);
+ private volatile boolean isListenerHandlingMessage = false;
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider executorProvider,
@@ -915,15 +914,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
}
protected void triggerListener() {
- // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
- // thread while the message processing happens
+ // Use internalPinnedExecutor to maintain message ordering
internalPinnedExecutor.execute(() -> {
try {
- // Control executor to call MessageListener one by one.
- if (executorQueueSize.get() < 1) {
+ // Listener should only have one pending/running executable to process a message
+ // See https://github.com/apache/pulsar/issues/11008 for context.
+ if (!isListenerHandlingMessage) {
final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
- executorQueueSize.incrementAndGet();
+ isListenerHandlingMessage = true;
+ // Trigger the notification on the message listener in a separate thread to avoid blocking the
+ // internal pinned executor thread while the message processing happens
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
callMessageListener(msg));
@@ -956,7 +957,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
msg.getMessageId(), t);
} finally {
- executorQueueSize.decrementAndGet();
+ isListenerHandlingMessage = false;
triggerListener();
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ff0b826..ac04651 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1127,8 +1127,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
increaseAvailablePermits(cnx(), skippedMessages.get());
}
- internalPinnedExecutor.execute(()
- -> tryTriggerListener());
+ tryTriggerListener();
}
void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {