You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by wa...@apache.org on 2022/03/31 20:46:04 UTC

[pulsar] branch branch-2.8 updated: Revert "Fix Regression in Consumer Performance"

This is an automated email from the ASF dual-hosted git repository.

wave pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 328f30c  Revert "Fix Regression in Consumer Performance"
328f30c is described below

commit 328f30c1fe0396a5e39f69b15133f979366df62a
Author: Dave Fisher <da...@datastax.com>
AuthorDate: Thu Mar 31 13:40:24 2022 -0700

    Revert "Fix Regression in Consumer Performance"
    
    This reverts commit 2a786c10c01b0686e7bce0eba07035c9989a0bad.
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 48 +++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 2 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index b0b4879..dd21666 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,33 +908,35 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected void triggerListener() {
         // Use internalPinnedExecutor to maintain message ordering
-        try {
-            // Listener should only have one pending/running executable to process a message
-            // See https://github.com/apache/pulsar/issues/11008 for context.
-            if (!isListenerHandlingMessage) {
-                final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                if (msg != null) {
-                    isListenerHandlingMessage = true;
-                    // Trigger the notification on the message listener in a separate thread to avoid blocking the
-                    // internal pinned executor thread while the message processing happens
-                    if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
-                        executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                            callMessageListener(msg));
-                    } else {
-                        getExternalExecutor(msg).execute(() -> {
-                            callMessageListener(msg);
-                        });
+        internalPinnedExecutor.execute(() -> {
+            try {
+                // Listener should only have one pending/running executable to process a message
+                // See https://github.com/apache/pulsar/issues/11008 for context.
+                if (!isListenerHandlingMessage) {
+                    final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        isListenerHandlingMessage = true;
+                        // Trigger the notification on the message listener in a separate thread to avoid blocking the
+                        // internal pinned executor thread while the message processing happens
+                        if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
+                            executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                                    callMessageListener(msg));
+                        } else {
+                            getExternalExecutor(msg).execute(() -> {
+                                callMessageListener(msg);
+                            });
+                        }
                     }
                 }
+            } catch (PulsarClientException e) {
+                log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+                return;
             }
-        } catch (PulsarClientException e) {
-            log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
-            return;
-        }
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
-        }
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+            }
+        });
     }
 
     protected void callMessageListener(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cd111f0..ebf3583 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,8 +1085,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
             uncompressedPayload.release();
         }
-        internalPinnedExecutor.execute(()
-                -> tryTriggerListener());
+        tryTriggerListener();
 
     }