You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/14 06:45:15 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #15162: [fix][java-client] Fix performance regression with message listener

eolivelli commented on code in PR #15162:
URL: https://github.com/apache/pulsar/pull/15162#discussion_r850130570


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -962,33 +961,34 @@ protected void tryTriggerListener() {
     }
 
     private void triggerListener() {
+        // The messages are added into the receiver queue by the internal pinned executor,
+        // so need to use internal pinned executor to avoid race condition which message
+        // might be added into the receiver queue but not able to read here.
         internalPinnedExecutor.execute(() -> {
             try {
-                // Listener should only have one pending/running executable to process a message
-                // See https://github.com/apache/pulsar/issues/11008 for context.
-                if (!isListenerHandlingMessage) {
-                    final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                Message<T> msg;
+                do {
+                    msg = internalReceive(0, TimeUnit.MILLISECONDS);
                     if (msg != null) {
-                        isListenerHandlingMessage = true;
                         // Trigger the notification on the message listener in a separate thread to avoid blocking the
                         // internal pinned executor thread while the message processing happens
+                        final Message<T> finalMsg = msg;
                         if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
                             executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                    callMessageListener(msg));
+                                    callMessageListener(finalMsg));
                         } else {
                             getExternalExecutor(msg).execute(() -> {
-                                callMessageListener(msg);
+                                callMessageListener(finalMsg);
                             });
                         }
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+                        }
                     }
-                }
+                } while (msg != null);

Review Comment:
   my understanding is that this way we are going to occupy the thread forever
   other Consumers won't be able to use this thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org