You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/13 09:59:27 UTC
[pulsar] branch master updated: [fix][broker] Do not use IO thread for consumerFlow in Shared subscription (#16304)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f702190948 [fix][broker] Do not use IO thread for consumerFlow in Shared subscription (#16304)
9f702190948 is described below
commit 9f7021909486ac97590b17a6c245367900ab989c
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Jul 13 11:59:19 2022 +0200
[fix][broker] Do not use IO thread for consumerFlow in Shared subscription (#16304)
---
.../service/persistent/PersistentDispatcherMultipleConsumers.java | 8 +++++++-
1 file changed, 7 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6d6e8a72f00..6af58557a83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -211,7 +211,13 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
}
@Override
- public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+ topic.getBrokerService().executor().execute(() -> {
+ internalConsumerFlow(consumer, additionalNumberOfMessages);
+ });
+ }
+
+ private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (!consumerSet.contains(consumer)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ignoring flow control from disconnected consumer {}", name, consumer);