You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 14:19:02 UTC

[pulsar] branch master updated: [fix][java-client] Fix performance regression with message listener (#15162)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 83cd7911ec4 [fix][java-client] Fix performance regression with message listener (#15162)
83cd7911ec4 is described below

commit 83cd7911ec43085be223b18e9d3d58a4638fd36f
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Apr 19 22:18:52 2022 +0800

    [fix][java-client] Fix performance regression with message listener (#15162)
    
    ### Motivation
    
    https://github.com/apache/pulsar/pull/13023 has introduced a performance regression.
    For each message, we are switching from external thread pool -> internal thread poll -> external thread pool.
    
    Previously we want to control the outstanding messages of a consumer using a listener, so after #11455,
    the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger
    in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression.
    
    Here is the frame graph to show the thread frame of the internal thread and external thread.
    [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt)
    
    And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order.
    
    ### Modification
    
    - Remove the isListenerHandlingMessage control
    - Move the messages from the receiver queue to the queue of external executor but not increase permits
    - Increase permits before call message listener
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  1 +
 .../apache/pulsar/client/impl/ConsumerBase.java    | 37 ++++++++++++----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 21 +++++++++---
 .../client/impl/MultiTopicsConsumerImpl.java       |  6 ++--
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  2 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  2 +-
 6 files changed, 43 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 9da75ea0512..e738e5916b4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -117,6 +117,7 @@ public class RawReaderImpl implements RawReader {
                     client.externalExecutorProvider(),
                     TopicName.getPartitionIndex(conf.getSingleTopic()),
                     false,
+                    false,
                     consumerFuture,
                     MessageId.earliest,
                     0 /* startMessageRollbackDurationInSec */,
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f9a7911935c..c0d6e2b1258 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -95,7 +95,6 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected volatile long incomingMessagesSize = 0;
     protected volatile Timeout batchReceiveTimeout = null;
     protected final Lock reentrantLock = new ReentrantLock();
-    private volatile boolean isListenerHandlingMessage = false;
 
     protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH =
             AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "consumerEpoch");
@@ -984,33 +983,34 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     private void triggerListener() {
+        // The messages are added into the receiver queue by the internal pinned executor,
+        // so need to use internal pinned executor to avoid race condition which message
+        // might be added into the receiver queue but not able to read here.
         internalPinnedExecutor.execute(() -> {
             try {
-                // Listener should only have one pending/running executable to process a message
-                // See https://github.com/apache/pulsar/issues/11008 for context.
-                if (!isListenerHandlingMessage) {
-                    final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS);
+                Message<T> msg;
+                do {
+                    msg = internalReceive(0, TimeUnit.MILLISECONDS);
                     if (msg != null) {
-                        isListenerHandlingMessage = true;
                         // Trigger the notification on the message listener in a separate thread to avoid blocking the
                         // internal pinned executor thread while the message processing happens
+                        final Message<T> finalMsg = msg;
                         if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
                             executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
-                                    callMessageListener(msg));
+                                    callMessageListener(finalMsg));
                         } else {
                             getExternalExecutor(msg).execute(() -> {
-                                callMessageListener(msg);
+                                callMessageListener(finalMsg);
                             });
                         }
+                    } else {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
+                        }
                     }
-                }
+                } while (msg != null);
             } catch (PulsarClientException e) {
                 log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
-                return;
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
             }
         });
     }
@@ -1021,13 +1021,16 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                 log.debug("[{}][{}] Calling message listener for message {}", topic, subscription,
                         msg.getMessageId());
             }
+            ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
+                    ? ((TopicMessageImpl<T>) msg).receivedByconsumer : (ConsumerImpl) this;
+            // Increase the permits here since we will not increase permits while receive messages from consumer
+            // after enabled message listener.
+            receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl
+                                ? ((TopicMessageImpl<T>) msg).getMessage() : msg));
             listener.received(ConsumerBase.this, msg);
         } catch (Throwable t) {
             log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
                     msg.getMessageId(), t);
-        } finally {
-            isListenerHandlingMessage = false;
-            triggerListener();
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3d8f7e56330..2bfe917e623 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -137,6 +137,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private final int partitionIndex;
     private final boolean hasParentConsumer;
+    private final boolean parentConsumerHasListener;
 
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
@@ -208,7 +209,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                                                Schema<T> schema,
                                                ConsumerInterceptors<T> interceptors,
                                                boolean createTopicIfDoesNotExist) {
-        return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
+        return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false,
                 subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0);
     }
 
@@ -218,6 +219,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                                                ExecutorProvider executorProvider,
                                                int partitionIndex,
                                                boolean hasParentConsumer,
+                                               boolean parentConsumerHasListener,
                                                CompletableFuture<Consumer<T>> subscribeFuture,
                                                MessageId startMessageId,
                                                Schema<T> schema,
@@ -231,6 +233,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     createTopicIfDoesNotExist);
         } else {
             return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
+                    parentConsumerHasListener,
                     subscribeFuture, startMessageId,
                     startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
                     schema, interceptors, createTopicIfDoesNotExist);
@@ -239,7 +242,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
            ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
-           CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
+           boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
            long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
            boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
@@ -253,6 +256,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
         this.partitionIndex = partitionIndex;
         this.hasParentConsumer = hasParentConsumer;
+        this.parentConsumerHasListener = parentConsumerHasListener;
         this.priorityLevel = conf.getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
@@ -1570,7 +1574,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (msgCnx != currentCnx) {
             // The processed message did belong to the old queue that was cleared after reconnection.
         } else {
-            increaseAvailablePermits(currentCnx);
+            if (listener == null && !parentConsumerHasListener) {
+                increaseAvailablePermits(currentCnx);
+            }
             stats.updateNumMsgsReceived(msg);
 
             trackMessage(msg);
@@ -1605,13 +1611,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
     }
 
+    void increaseAvailablePermits(MessageImpl<?> msg) {
+        ClientCnx currentCnx = cnx();
+        ClientCnx msgCnx = msg.getCnx();
+        if (msgCnx == currentCnx) {
+            increaseAvailablePermits(currentCnx);
+        }
+    }
+
     void increaseAvailablePermits(ClientCnx currentCnx) {
         increaseAvailablePermits(currentCnx, 1);
     }
 
     protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
         int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
-
         while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
             if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
                 sendFlowPermitsToBroker(currentCnx, available);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index c0c626a600a..54cef152013 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1056,7 +1056,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         configurationData.setStartPaused(paused);
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
                                     configurationData, client.externalExecutorProvider(),
-                                    partitionIndex, true, subFuture,
+                                    partitionIndex, true, listener != null, subFuture,
                                     startMessageId, schema, interceptors,
                                     createIfDoesNotExist, startMessageRollbackDurationInSec);
                         synchronized (pauseMutex) {
@@ -1086,7 +1086,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     internalConfig.setStartPaused(paused);
                     ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                             client.externalExecutorProvider(), -1,
-                            true, subFuture, startMessageId, schema, interceptors,
+                            true, listener != null, subFuture, startMessageId, schema, interceptors,
                             createIfDoesNotExist, startMessageRollbackDurationInSec);
 
                     synchronized (pauseMutex) {
@@ -1390,7 +1390,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                                 client, partitionName, configurationData,
                                 client.externalExecutorProvider(),
-                                partitionIndex, true, subFuture, startMessageId, schema, interceptors,
+                                partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors,
                                 true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec);
                         synchronized (pauseMutex) {
                             if (paused) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 7701722249f..c4b5263736e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -128,7 +128,7 @@ public class ReaderImpl<T> implements Reader<T> {
                         this, readerConfiguration.getReaderInterceptorList());
         final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration,
-                executorProvider, partitionIdx, false, consumerFuture,
+                executorProvider, partitionIdx, false, false, consumerFuture,
                 readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(),
                 schema, consumerInterceptors, true /* createTopicIfDoesNotExist */);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index e63d803237e..df7ca4789fd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -51,7 +51,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
              CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId, Schema<T> schema,
              ConsumerInterceptors<T> interceptors,
              boolean createTopicIfDoesNotExist) {
-        super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture,
+        super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false, subscribeFuture,
                 startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
                 createTopicIfDoesNotExist);
     }