You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/22 11:53:14 UTC

[pulsar] 05/07: [Java Client] Improve consumer listener logic (#13273)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e26afd99c9f91887eb63d9d05463f8e41c9a2548
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Tue Dec 21 15:17:21 2021 -0600

    [Java Client] Improve consumer listener logic (#13273)
    
    * [Java Client] Improve consumer listener logic
    
    * Move isListenerHandlingMessage update to before submitting to executor
    
    (cherry picked from commit 9f46c4af5e0dad7f9223add57842591822c3dea3)
---
 .../org/apache/pulsar/client/impl/ConsumerBase.java     | 17 +++++++++--------
 .../org/apache/pulsar/client/impl/ConsumerImpl.java     |  3 +--
 2 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0f47208..20baf47 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -85,7 +84,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected volatile long incomingMessagesSize = 0;
     protected volatile Timeout batchReceiveTimeout = null;
     protected final Lock reentrantLock = new ReentrantLock();
-    private final AtomicInteger executorQueueSize = new AtomicInteger(0);
+    private volatile boolean isListenerHandlingMessage = false;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider executorProvider,
@@ -915,15 +914,17 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void triggerListener() {
-        // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
-        // thread while the message processing happens
+        // Use internalPinnedExecutor to maintain message ordering
         internalPinnedExecutor.execute(() -> {
             try {
-                // Control executor to call MessageListener one by one.
-                if (executorQueueSize.get() < 1) {
+                // Listener should only have one pending/running executable to process a message
+                // See https://github.com/apache/pulsar/issues/11008 for context.
+                if (!isListenerHandlingMessage) {
                     final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
                     if (msg != null) {
-                        executorQueueSize.incrementAndGet();
+                        isListenerHandlingMessage = true;
+                        // Trigger the notification on the message listener in a separate thread to avoid blocking the
+                        // internal pinned executor thread while the message processing happens
                         if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
                             executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
                                     callMessageListener(msg));
@@ -956,7 +957,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
                     msg.getMessageId(), t);
         } finally {
-            executorQueueSize.decrementAndGet();
+            isListenerHandlingMessage = false;
             triggerListener();
         }
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ff0b826..ac04651 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1127,8 +1127,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             increaseAvailablePermits(cnx(), skippedMessages.get());
         }
 
-        internalPinnedExecutor.execute(()
-                -> tryTriggerListener());
+        tryTriggerListener();
     }
 
     void messageReceived(MessageIdData messageId, int redeliveryCount, List<Long> ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {