You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by wa...@apache.org on 2022/03/31 20:53:13 UTC

[pulsar] branch dave2wave-fix-performance-of-client updated (2a786c1 -> 5d65f56)

This is an automated email from the ASF dual-hosted git repository.

wave pushed a change to branch dave2wave-fix-performance-of-client
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 2a786c1  Fix Regression in Consumer Performance
     new 907b8a7  Revert "Fix Regression in Consumer Performance"
     new 5d65f56  Revert "Revert "Fix Regression in Consumer Performance""

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

[pulsar] 02/02: Revert "Revert "Fix Regression in Consumer Performance""

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wave pushed a commit to branch dave2wave-fix-performance-of-client
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d65f5617e9daa4dc0764f26246fa160d83e61b2
Author: Dave Fisher <da...@datastax.com>
AuthorDate: Thu Mar 31 13:51:33 2022 -0700

    Revert "Revert "Fix Regression in Consumer Performance""
    
    This reverts commit 907b8a7c55cddbe4557931190bac328e220ea44d.
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 48 +++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 2 files changed, 25 insertions(+), 26 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index dd21666..b0b4879 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,35 +908,33 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected void triggerListener() {
         // Use internalPinnedExecutor to maintain message ordering
-        internalPinnedExecutor.execute(() -> {
-            try {
-                // Listener should only have one pending/running executable to process a message
-                // See https://github.com/apache/pulsar/issues/11008 for context.
-                if (!isListenerHandlingMessage) {
-                    final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                    if (msg != null) {
-                        isListenerHandlingMessage = true;
-                        // Trigger the notification on the message listener in a separate thread to avoid blocking the
-                        // internal pinned executor thread while the message processing happens
-                        if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
-                            executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                    callMessageListener(msg));
-                        } else {
-                            getExternalExecutor(msg).execute(() -> {
-                                callMessageListener(msg);
-                            });
-                        }
+        try {
+            // Listener should only have one pending/running executable to process a message
+            // See https://github.com/apache/pulsar/issues/11008 for context.
+            if (!isListenerHandlingMessage) {
+                final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                if (msg != null) {
+                    isListenerHandlingMessage = true;
+                    // Trigger the notification on the message listener in a separate thread to avoid blocking the
+                    // internal pinned executor thread while the message processing happens
+                    if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
+                        executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                            callMessageListener(msg));
+                    } else {
+                        getExternalExecutor(msg).execute(() -> {
+                            callMessageListener(msg);
+                        });
                     }
                 }
-            } catch (PulsarClientException e) {
-                log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
-                return;
             }
+        } catch (PulsarClientException e) {
+            log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+            return;
+        }
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
-            }
-        });
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+        }
     }
 
     protected void callMessageListener(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ebf3583..cd111f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,7 +1085,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
             uncompressedPayload.release();
         }
-        tryTriggerListener();
+        internalPinnedExecutor.execute(()
+                -> tryTriggerListener());
 
     }
 

[pulsar] 01/02: Revert "Fix Regression in Consumer Performance"

Posted by wa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

wave pushed a commit to branch dave2wave-fix-performance-of-client
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 907b8a7c55cddbe4557931190bac328e220ea44d
Author: Dave Fisher <da...@datastax.com>
AuthorDate: Thu Mar 31 13:51:25 2022 -0700

    Revert "Fix Regression in Consumer Performance"
    
    This reverts commit 2a786c10c01b0686e7bce0eba07035c9989a0bad.
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 48 +++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  3 +-
 2 files changed, 26 insertions(+), 25 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index b0b4879..dd21666 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -908,33 +908,35 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected void triggerListener() {
         // Use internalPinnedExecutor to maintain message ordering
-        try {
-            // Listener should only have one pending/running executable to process a message
-            // See https://github.com/apache/pulsar/issues/11008 for context.
-            if (!isListenerHandlingMessage) {
-                final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
-                if (msg != null) {
-                    isListenerHandlingMessage = true;
-                    // Trigger the notification on the message listener in a separate thread to avoid blocking the
-                    // internal pinned executor thread while the message processing happens
-                    if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
-                        executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                            callMessageListener(msg));
-                    } else {
-                        getExternalExecutor(msg).execute(() -> {
-                            callMessageListener(msg);
-                        });
+        internalPinnedExecutor.execute(() -> {
+            try {
+                // Listener should only have one pending/running executable to process a message
+                // See https://github.com/apache/pulsar/issues/11008 for context.
+                if (!isListenerHandlingMessage) {
+                    final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                    if (msg != null) {
+                        isListenerHandlingMessage = true;
+                        // Trigger the notification on the message listener in a separate thread to avoid blocking the
+                        // internal pinned executor thread while the message processing happens
+                        if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
+                            executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
+                                    callMessageListener(msg));
+                        } else {
+                            getExternalExecutor(msg).execute(() -> {
+                                callMessageListener(msg);
+                            });
+                        }
                     }
                 }
+            } catch (PulsarClientException e) {
+                log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+                return;
             }
-        } catch (PulsarClientException e) {
-            log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
-            return;
-        }
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
-        }
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+            }
+        });
     }
 
     protected void callMessageListener(Message<T> msg) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index cd111f0..ebf3583 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1085,8 +1085,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
             uncompressedPayload.release();
         }
-        internalPinnedExecutor.execute(()
-                -> tryTriggerListener());
+        tryTriggerListener();
 
     }