You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/26 02:30:01 UTC
[pulsar] branch master updated: [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (#16603)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new abe5d15263b [enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (#16603)
abe5d15263b is described below
commit abe5d15263b1bca5d2bf606d013b548399086906
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Jul 26 04:29:55 2022 +0200
[enh] Broker - Shared subscription: run filters in a separate (per-subscription) thread (dispatcherDispatchMessagesInSubscriptionThread) (#16603)
---
conf/broker.conf | 3 +++
.../org/apache/pulsar/broker/ServiceConfiguration.java | 7 +++++++
.../persistent/PersistentDispatcherMultipleConsumers.java | 15 +++++++++++++--
.../PersistentStickyKeyDispatcherMultipleConsumers.java | 2 +-
.../broker/service/persistent/DelayedDeliveryTest.java | 1 +
...ersistentStickyKeyDispatcherMultipleConsumersTest.java | 12 ++++++++++++
6 files changed, 37 insertions(+), 3 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index c19afe981ca..7f07ae54449 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -406,6 +406,9 @@ dispatchThrottlingOnNonBacklogConsumerEnabled=true
# Max number of entries to read from bookkeeper. By default it is 100 entries.
dispatcherMaxReadBatchSize=100
+# Dispatch messages and execute broker side filters in a per-subscription thread
+dispatcherDispatchMessagesInSubscriptionThread=true
+
# Max size in bytes of entries to read from bookkeeper. By default it is 5MB.
dispatcherMaxReadSizeBytes=5242880
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ad4188d2882..f8c041c79b7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -990,6 +990,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int dispatcherMaxReadBatchSize = 100;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_SERVER,
+ doc = "Dispatch messages and execute broker side filters in a per-subscription thread"
+ )
+ private boolean dispatcherDispatchMessagesInSubscriptionThread = true;
+
// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index a7d52dcc098..71faeb7adba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;
+import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
@@ -29,6 +30,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -109,6 +111,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
+ private final ExecutorService dispatchMessagesThread;
protected enum ReadType {
Normal, Replay
@@ -126,6 +129,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
this.topic = topic;
+ this.dispatchMessagesThread = topic.getBrokerService().getTopicOrderedExecutor().chooseThread();
this.redeliveryMessages = new MessageRedeliveryController(allowOutOfOrderDelivery);
this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled()
? new InMemoryRedeliveryTracker()
@@ -524,10 +528,17 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}
- sendMessagesToConsumers(readType, entries);
+ if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
+ // dispatch messages to a separate thread, but still in order for this subscription
+ // sendMessagesToConsumers is responsible for running broker-side filters
+ // that may be quite expensive
+ dispatchMessagesThread.execute(safeRun(() -> sendMessagesToConsumers(readType, entries)));
+ } else {
+ sendMessagesToConsumers(readType, entries);
+ }
}
- protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 60762d8400a..558e3f129ce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -152,7 +152,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
};
@Override
- protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
+ protected synchronized void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
index 8b62845572d..aa787907329 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java
@@ -278,6 +278,7 @@ public class DelayedDeliveryTest extends ProducerConsumerBase {
for (int i = 0; i < N; i++) {
msg = consumer.receive(10, TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
+ consumer.acknowledge(msg);
}
assertEquals(receivedMsgs.size(), N);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 6a44e3dfcf7..72286b01c76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.netty.channel.EventLoopGroup;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
@@ -84,6 +85,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
private PersistentSubscription subscriptionMock;
private ServiceConfiguration configMock;
private ChannelPromise channelMock;
+ private OrderedExecutor orderedExecutor;
private PersistentStickyKeyDispatcherMultipleConsumers persistentDispatcher;
@@ -107,6 +109,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
HierarchyTopicPolicies topicPolicies = new HierarchyTopicPolicies();
topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(0);
+ orderedExecutor = OrderedExecutor.newBuilder().build();
+ doReturn(orderedExecutor).when(brokerMock).getTopicOrderedExecutor();
+
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
doReturn(eventLoopGroup).when(brokerMock).executor();
doAnswer(invocation -> {
@@ -144,6 +149,13 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT));
}
+ public void cleanup() {
+ if (orderedExecutor != null) {
+ orderedExecutor.shutdown();
+ orderedExecutor = null;
+ }
+ }
+
@Test
public void testSendMarkerMessage() {
try {