You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/13 09:59:27 UTC

[pulsar] branch master updated: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription (#16304)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f702190948 [fix][broker] Do not use IO thread for consumerFlow in Shared subscription (#16304)
9f702190948 is described below

commit 9f7021909486ac97590b17a6c245367900ab989c
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Jul 13 11:59:19 2022 +0200

    [fix][broker] Do not use IO thread for consumerFlow in Shared subscription (#16304)
---
 .../service/persistent/PersistentDispatcherMultipleConsumers.java | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6d6e8a72f00..6af58557a83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -211,7 +211,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     }
 
     @Override
-    public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+    public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+        topic.getBrokerService().executor().execute(() -> {
+            internalConsumerFlow(consumer, additionalNumberOfMessages);
+        });
+    }
+
+    private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
         if (!consumerSet.contains(consumer)) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Ignoring flow control from disconnected consumer {}", name, consumer);