You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/29 05:02:49 UTC

[pulsar] branch master updated: [feat][broker] Add config to count filtered entries towards rate limits (#17686)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db26073728b [feat][broker] Add config to count filtered entries towards rate limits (#17686)
db26073728b is described below

commit db26073728bf86fc80deecaece2dc02b50bbb9b5
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Wed Sep 28 22:02:41 2022 -0700

    [feat][broker] Add config to count filtered entries towards rate limits (#17686)
    
    * [feat][broker] Add config to count filtered entries towards rate limits
    
    * Make fixes for checkstyle
    
    * Remove * import
    
    * Fix incorrect conflict resolution in merge commit
    
    ### Motivation
    
    Currently, when using entry filters, filtered out messages do not count against the rate limit. Therefore, a subscription that is completely filtered will never be throttled due to rate limiting. When the messages are delivered to the consumer for a filtered subscription, those messages will count against the rate limit, and in that case, the message filtering can be throttled because the check to delay `readMoreEntries()` happens before message filtering. Therefore, the rate limit wi [...]
    
    It's possible that some use cases prefer this behavior, but in my case, I think it'd be valuable to include these filtered messages in the dispatch throttling because these messages still cost the broker network, memory, and cpu. This PR adds a configuration to count filtered out messages towards dispatch rate limits for the broker, the topic, and the subscription.
    
    ### Modifications
    
    * Add configuration named `dispatchThrottlingForFilteredEntriesEnabled`. Default it to false so we maintain the original behavior. When true, count filtered messages against rate limits.
    * Refactor the code to `acquirePermitsForDeliveredMessages` so that it is in the `AbstractBaseDispatcher`, which makes it available to the entry filtering logic.
    
    ### Verifying this change
    
    A new test is added as part of this PR.
    
    ### Does this pull request potentially affect one of the following parts:
    
    This PR introduces a new config while maintaining the current behavior.
    
    ### Documentation
    
    - [x] `doc-not-needed`
    Config docs are auto-generated.
---
 conf/broker.conf                                   |  6 ++++
 .../apache/pulsar/broker/ServiceConfiguration.java | 10 +++++++
 .../broker/service/AbstractBaseDispatcher.java     | 27 ++++++++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     | 21 ++------------
 .../PersistentDispatcherSingleActiveConsumer.java  | 19 ++-----------
 ...istentStickyKeyDispatcherMultipleConsumers.java | 14 +--------
 .../broker/service/AbstractBaseDispatcherTest.java | 33 ++++++++++++++++++----
 7 files changed, 75 insertions(+), 55 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 30e79ebc9f0..d117d679c85 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -453,6 +453,12 @@ entryFilterNames=
 # The directory for all the entry filter implementations
 entryFiltersDirectory=
 
+# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled,
+# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and
+# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards
+# each relevant rate limit.
+dispatchThrottlingForFilteredEntriesEnabled=false
+
 # Whether allow topic level entry filters policies overrides broker configuration.
 allowOverrideEntryFilters=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6e9a556820..8c883045e66 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1050,6 +1050,16 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private boolean dispatcherDispatchMessagesInSubscriptionThread = true;
 
+    @FieldContext(
+        dynamic = false,
+        category = CATEGORY_SERVER,
+        doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, "
+            + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and "
+            + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards "
+            + "each relevant rate limit."
+    )
+    private boolean dispatchThrottlingForFilteredEntriesEnabled = false;
+
     // <-- dispatcher read settings -->
     @FieldContext(
         dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 29710067a61..df02bbd85d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -107,6 +107,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        int filteredMessageCount = 0;
+        int filteredEntryCount = 0;
+        long filteredBytesCount = 0;
         final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
         List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
         List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
@@ -135,6 +138,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
                 // FilterResult will be always `ACCEPTED` when there is No Filter
                 // dont need to judge whether `hasFilter` is true or not.
                 this.filterRejectedMsgs.add(entryMsgCnt);
+                filteredEntryCount++;
+                filteredMessageCount += entryMsgCnt;
+                filteredBytesCount += metadataAndPayload.readableBytes();
                 entry.release();
                 continue;
             } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
@@ -143,6 +149,9 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
                 // FilterResult will be always `ACCEPTED` when there is No Filter
                 // dont need to judge whether `hasFilter` is true or not.
                 this.filterRescheduledMsgs.add(entryMsgCnt);
+                filteredEntryCount++;
+                filteredMessageCount += entryMsgCnt;
+                filteredBytesCount += metadataAndPayload.readableBytes();
                 entry.release();
                 continue;
             }
@@ -231,6 +240,11 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
 
         }
 
+        if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
+            acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
+                    filteredMessageCount, filteredBytesCount);
+        }
+
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
@@ -243,6 +257,19 @@ public abstract class AbstractBaseDispatcher extends EntryFilterSupport implemen
         }
     }
 
+    protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
+                                                      long totalMessagesSent, long totalBytesSent) {
+        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
+                || (cursor != null && !cursor.isActive())) {
+            long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
+            topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
+                    rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+            topic.getDispatchRateLimiter().ifPresent(rateLimter ->
+                    rateLimter.tryDispatchPermit(permits, totalBytesSent));
+            getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+        }
+    }
+
     /**
      * Determine whether the number of consumers on the subscription reaches the threshold.
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02d2e725379..15b42fedd38 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -684,7 +684,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+        acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
 
         if (entriesToDispatch > 0) {
             if (log.isDebugEnabled()) {
@@ -700,23 +700,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
         return true;
     }
 
-    private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) {
-        // acquire message-dispatch permits for already delivered messages
-        long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
-        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
-                topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-            if (topic.getDispatchRateLimiter().isPresent()) {
-                topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-
-            if (dispatchRateLimiter.isPresent()) {
-                dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
-            }
-        }
-    }
-
     private boolean sendChunkedMessagesToConsumers(ReadType readType,
                                                    List<Entry> entries,
                                                    MessageMetadata[] metadataArray) {
@@ -775,7 +758,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+        acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
 
         return numConsumers.get() == 0; // trigger a new readMoreEntries() call
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index accab20d2da..3ba7a82aa5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -221,23 +221,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                     redeliveryTracker, epoch)
             .addListener(future -> {
                 if (future.isSuccess()) {
-                    int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
-                            : sendMessageInfo.getTotalMessages();
-                    // acquire message-dispatch permits for already delivered messages
-                    if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                        if (topic.getBrokerDispatchRateLimiter().isPresent()) {
-                            topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
-                                    sendMessageInfo.getTotalBytes());
-                        }
-
-                        if (topic.getDispatchRateLimiter().isPresent()) {
-                            topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
-                                    sendMessageInfo.getTotalBytes());
-                        }
-                        dispatchRateLimiter.ifPresent(rateLimiter ->
-                                rateLimiter.tryDispatchPermit(permits,
-                                        sendMessageInfo.getTotalBytes()));
-                    }
+                    acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
+                            sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
 
                     // Schedule a new read batch operation only after the previous batch has been written to the socket.
                     topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 024ed8581ef..5eb553106e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -296,19 +296,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
 
         // acquire message-dispatch permits for already delivered messages
-        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
-            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
-                topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-            if (topic.getDispatchRateLimiter().isPresent()) {
-                topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-
-            if (dispatchRateLimiter.isPresent()) {
-                dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
-            }
-        }
+        acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
 
         stuckConsumers.clear();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index b129995a8cc..cba15b06310 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
@@ -29,11 +30,14 @@ import io.netty.buffer.Unpooled;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -60,8 +64,9 @@ public class AbstractBaseDispatcherTest {
     @BeforeMethod
     public void setup() throws Exception {
         this.svcConfig = mock(ServiceConfiguration.class);
+        when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true);
         this.subscriptionMock = mock(PersistentSubscription.class);
-        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
     }
 
     @Test
@@ -89,17 +94,24 @@ public class AbstractBaseDispatcherTest {
                 EntryFilter.FilterResult.REJECT);
         Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", mockFilter);
         when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
+        DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class);
 
-        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
+                subscriptionDispatchRateLimiter);
 
         List<Entry> entries = new ArrayList<>();
 
-        entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
+        Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
+        long expectedBytePermits = e.getLength();
+        entries.add(e);
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-        //
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
+
+        ManagedCursor cursor = mock(ManagedCursor.class);
+
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null);
         assertEquals(size, 0);
+        verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits);
     }
 
     @Test
@@ -201,9 +213,18 @@ public class AbstractBaseDispatcherTest {
 
     private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
 
+        private final Optional<DispatchRateLimiter> dispatchRateLimiter;
+
         protected AbstractBaseDispatcherTestHelper(Subscription subscription,
-                                                   ServiceConfiguration serviceConfig) {
+                                                   ServiceConfiguration serviceConfig,
+                                                   DispatchRateLimiter rateLimiter) {
             super(subscription, serviceConfig);
+            dispatchRateLimiter = Optional.ofNullable(rateLimiter);
+        }
+
+        @Override
+        public Optional<DispatchRateLimiter> getRateLimiter() {
+            return dispatchRateLimiter;
         }
 
         @Override