You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/26 02:30:01 UTC

[pulsar] branch master updated: [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (#16603)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new abe5d15263b [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (#16603)
abe5d15263b is described below

commit abe5d15263b1bca5d2bf606d013b548399086906
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Jul 26 04:29:55 2022 +0200

    [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (#16603)
---
 conf/broker.conf                                          |  3 +++
 .../org/apache/pulsar/broker/ServiceConfiguration.java    |  7 +++++++
 .../persistent/PersistentDispatcherMultipleConsumers.java | 15 +++++++++++++--
 .../PersistentStickyKeyDispatcherMultipleConsumers.java   |  2 +-
 .../broker/service/persistent/DelayedDeliveryTest.java    |  1 +
 ...ersistentStickyKeyDispatcherMultipleConsumersTest.java | 12 ++++++++++++
 6 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index c19afe981ca..7f07ae54449 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -406,6 +406,9 @@ dispatchThrottlingOnNonBacklogConsumerEnabled=true
 # Max number of entries to read from bookkeeper. By default it is 100 entries.
 dispatcherMaxReadBatchSize=100
 
+# Dispatch messages and execute broker side filters in a per-subscription thread
+dispatcherDispatchMessagesInSubscriptionThread=true
+
 # Max size in bytes of entries to read from bookkeeper. By default it is 5MB.
 dispatcherMaxReadSizeBytes=5242880
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ad4188d2882..f8c041c79b7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -990,6 +990,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private int dispatcherMaxReadBatchSize = 100;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_SERVER,
+            doc = "Dispatch messages and execute broker side filters in a per-subscription thread"
+    )
+    private boolean dispatcherDispatchMessagesInSubscriptionThread = true;
+
     // <-- dispatcher read settings -->
     @FieldContext(
         dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a7d52dcc098..71faeb7adba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
@@ -29,6 +30,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -109,6 +111,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
 
     private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
+    private final ExecutorService dispatchMessagesThread;
 
     protected enum ReadType {
         Normal, Replay
@@ -126,6 +129,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
         this.topic = topic;
+        this.dispatchMessagesThread = topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
         this.redeliveryMessages = new MessageRedeliveryController(allowOutOfOrderDelivery);
         this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
                 ? new InMemoryRedeliveryTracker()
@@ -524,10 +528,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
         }
 
-        sendMessagesToConsumers(readType, entries);
+        if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+            // dispatch messages to a separate thread, but still in order for this subscription
+            // sendMessagesToConsumers is responsible for running broker-side filters
+            // that may be quite expensive
+            dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+        } else {
+            sendMessagesToConsumers(readType, entries);
+        }
     }
 
-    protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
 
         if (needTrimAckedMessages()) {
             cursor.trimDeletedEntries(entries);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 60762d8400a..558e3f129ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -152,7 +152,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
             };
 
     @Override
-    protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+    protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         long totalEntries = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 8b62845572d..aa787907329 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -278,6 +278,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
         for (int i = 0; i < N; i++) {
             msg = consumer.receive(10, TimeUnit.SECONDS);
             receivedMsgs.add(msg.getValue());
+            consumer.acknowledge(msg);
         }
 
         assertEquals(receivedMsgs.size(), N);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 6a44e3dfcf7..72286b01c76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
@@ -84,6 +85,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
     private PersistentSubscription subscriptionMock;
     private ServiceConfiguration configMock;
     private ChannelPromise channelMock;
+    private OrderedExecutor orderedExecutor;
 
     private PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher;
 
@@ -107,6 +109,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
         topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
 
+        orderedExecutor = OrderedExecutor.newBuilder().build();
+        doReturn(orderedExecutor).when(brokerMock).getTopicOrderedExecutor();
+
         EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
         doReturn(eventLoopGroup).when(brokerMock).executor();
         doAnswer(invocation -> {
@@ -144,6 +149,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
             new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
     }
 
+    public void cleanup() {
+        if (orderedExecutor != null) {
+            orderedExecutor.shutdown();
+            orderedExecutor = null;
+        }
+    }
+
     @Test
     public void testSendMarkerMessage() {
         try {