You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/14 13:25:11 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #16603: [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread)

eolivelli commented on code in PR #16603:
URL: https://github.com/apache/pulsar/pull/16603#discussion_r921151871


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -520,10 +521,19 @@ public synchronized void readEntriesComplete(List<Entry> entries, Object ctx) {
             log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
         }
 
-        sendMessagesToConsumers(readType, entries);
+        if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+            // dispatch messages to a separate thread, but still in order for this subscription
+            // sendMessagesToConsumers is responsible for running broker-side filters
+            // that may be quite expensive
+            topic.getBrokerService().getTopicOrderedExecutor()
+                    .executeOrdered(name,
+                            safeRun(() -> sendMessagesToConsumers(readType, entries)));
+        } else {
+            sendMessagesToConsumers(readType, entries);
+        }
     }
 
-    protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

Review Comment:
   sendMessagesToConsumers is usually called by readEntriesComplete, that is already  `synchronized`
   https://github.com/apache/pulsar/blob/019493dbcf8e86c9caff71a88adc03aed5b1fbc8/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L492
   
   the JVM is smart enough to understand that there is no need for extra work for `synchronized`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org