You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/29 05:02:49 UTC
[pulsar] branch master updated: [feat][broker] Add config to count filtered entries towards rate limits (#17686)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new db26073728b [feat][broker] Add config to count filtered entries towards rate limits (#17686)
db26073728b is described below
commit db26073728bf86fc80deecaece2dc02b50bbb9b5
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Sep 28 22:02:41 2022 -0700
[feat][broker] Add config to count filtered entries towards rate limits (#17686)
* [feat][broker] Add config to count filtered entries towards rate limits
* Make fixes for checkstyle
* Remove * import
* Fix incorrect conflict resolution in merge commit
### Motivation
Currently, when using entry filters, filtered out messages do not count against the rate limit. Therefore, a subscription that is completely filtered will never be throttled due to rate limiting. When the messages are delivered to the consumer for a filtered subscription, those messages will count against the rate limit, and in that case, the message filtering can be throttled because the check to delay `readMoreEntries()` happens before message filtering. Therefore, the rate limit wi [...]
It's possible that some use cases prefer this behavior, but in my case, I think it'd be valuable to include these filtered messages in the dispatch throttling because these messages still cost the broker network, memory, and cpu. This PR adds a configuration to count filtered out messages towards dispatch rate limits for the broker, the topic, and the subscription.
### Modifications
* Add configuration named `dispatchThrottlingForFilteredEntriesEnabled`. Default it to false so we maintain the original behavior. When true, count filtered messages against rate limits.
* Refactor the code to `acquirePermitsForDeliveredMessages` so that it is in the `AbstractBaseDispatcher`, which makes it available to the entry filtering logic.
### Verifying this change
A new test is added as part of this PR.
### Does this pull request potentially affect one of the following parts:
This PR introduces a new config while maintaining the current behavior.
### Documentation
- [x] `doc-not-needed`
Config docs are auto-generated.
---
conf/broker.conf | 6 ++++
.../apache/pulsar/broker/ServiceConfiguration.java | 10 +++++++
.../broker/service/AbstractBaseDispatcher.java | 27 ++++++++++++++++++
.../PersistentDispatcherMultipleConsumers.java | 21 ++------------
.../PersistentDispatcherSingleActiveConsumer.java | 19 ++-----------
...istentStickyKeyDispatcherMultipleConsumers.java | 14 +--------
.../broker/service/AbstractBaseDispatcherTest.java | 33 ++++++++++++++++++----
7 files changed, 75 insertions(+), 55 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 30e79ebc9f0..d117d679c85 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -453,6 +453,12 @@ entryFilterNames=
# The directory for all the entry filter implementations
entryFiltersDirectory=
+# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled,
+# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and
+# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards
+# each relevant rate limit.
+dispatchThrottlingForFilteredEntriesEnabled=false
+
# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6e9a556820..8c883045e66 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1050,6 +1050,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean dispatcherDispatchMessagesInSubscriptionThread = true;
+ @FieldContext(
+ dynamic = false,
+ category = CATEGORY_SERVER,
+ doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, "
+ + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and "
+ + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards "
+ + "each relevant rate limit."
+ )
+ private boolean dispatchThrottlingForFilteredEntriesEnabled = false;
+
// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 29710067a61..df02bbd85d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -107,6 +107,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
+ int filteredMessageCount = 0;
+ int filteredEntryCount = 0;
+ long filteredBytesCount = 0;
final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
@@ -135,6 +138,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
+ filteredEntryCount++;
+ filteredMessageCount += entryMsgCnt;
+ filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
@@ -143,6 +149,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
+ filteredEntryCount++;
+ filteredMessageCount += entryMsgCnt;
+ filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
}
@@ -231,6 +240,11 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
}
+ if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
+ acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
+ filteredMessageCount, filteredBytesCount);
+ }
+
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
@@ -243,6 +257,19 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
}
}
+ protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
+ long totalMessagesSent, long totalBytesSent) {
+ if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
+ || (cursor != null && !cursor.isActive())) {
+ long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
+ topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
+ rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+ topic.getDispatchRateLimiter().ifPresent(rateLimter ->
+ rateLimter.tryDispatchPermit(permits, totalBytesSent));
+ getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+ }
+ }
+
/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02d2e725379..15b42fedd38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -684,7 +684,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+ acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
@@ -700,23 +700,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
return true;
}
- private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) {
- // acquire message-dispatch permits for already delivered messages
- long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
- if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- if (topic.getBrokerDispatchRateLimiter().isPresent()) {
- topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
-
- if (dispatchRateLimiter.isPresent()) {
- dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
- }
- }
- }
-
private boolean sendChunkedMessagesToConsumers(ReadType readType,
List<Entry> entries,
MessageMetadata[] metadataArray) {
@@ -775,7 +758,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+ acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
return numConsumers.get() == 0; // trigger a new readMoreEntries() call
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index accab20d2da..3ba7a82aa5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -221,23 +221,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
- int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
- : sendMessageInfo.getTotalMessages();
- // acquire message-dispatch permits for already delivered messages
- if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- if (topic.getBrokerDispatchRateLimiter().isPresent()) {
- topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
- sendMessageInfo.getTotalBytes());
- }
-
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
- sendMessageInfo.getTotalBytes());
- }
- dispatchRateLimiter.ifPresent(rateLimiter ->
- rateLimiter.tryDispatchPermit(permits,
- sendMessageInfo.getTotalBytes()));
- }
+ acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
+ sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 024ed8581ef..5eb553106e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -296,19 +296,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
// acquire message-dispatch permits for already delivered messages
- if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
- if (topic.getBrokerDispatchRateLimiter().isPresent()) {
- topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
-
- if (dispatchRateLimiter.isPresent()) {
- dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
- }
- }
+ acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
stuckConsumers.clear();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index b129995a8cc..cba15b06310 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
@@ -29,11 +30,14 @@ import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -60,8 +64,9 @@ public class AbstractBaseDispatcherTest {
@BeforeMethod
public void setup() throws Exception {
this.svcConfig = mock(ServiceConfiguration.class);
+ when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true);
this.subscriptionMock = mock(PersistentSubscription.class);
- this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+ this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
}
@Test
@@ -89,17 +94,24 @@ public class AbstractBaseDispatcherTest {
EntryFilter.FilterResult.REJECT);
Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", mockFilter);
when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
+ DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class);
- this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+ this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
+ subscriptionDispatchRateLimiter);
List<Entry> entries = new ArrayList<>();
- entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
+ Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
+ long expectedBytePermits = e.getLength();
+ entries.add(e);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- //
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
+
+ ManagedCursor cursor = mock(ManagedCursor.class);
+
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null);
assertEquals(size, 0);
+ verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits);
}
@Test
@@ -201,9 +213,18 @@ public class AbstractBaseDispatcherTest {
private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
+ private final Optional<DispatchRateLimiter> dispatchRateLimiter;
+
protected AbstractBaseDispatcherTestHelper(Subscription subscription,
- ServiceConfiguration serviceConfig) {
+ ServiceConfiguration serviceConfig,
+ DispatchRateLimiter rateLimiter) {
super(subscription, serviceConfig);
+ dispatchRateLimiter = Optional.ofNullable(rateLimiter);
+ }
+
+ @Override
+ public Optional<DispatchRateLimiter> getRateLimiter() {
+ return dispatchRateLimiter;
}
@Override