You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/26 03:58:12 UTC

[pulsar] branch master updated: Optimized namespace-level dispathcRateLimiter update (#15315)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 74eae1a729c Optimized namespace-level dispathcRateLimiter update (#15315)
74eae1a729c is described below

commit 74eae1a729c33c9fead9d54594bdee5fac8ab153
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Tue Apr 26 11:58:06 2022 +0800

    Optimized namespace-level dispathcRateLimiter update (#15315)
---
 .../pulsar/broker/service/persistent/PersistentTopic.java     | 11 ++---------
 1 file changed, 2 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eed8ba0d3b8..bfe0b644109 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2398,9 +2398,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         schemaValidationEnforced = data.schema_validation_enforced;
 
-        //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies();
-
         initializeRateLimiterIfNeeded();
 
         updatePublishDispatcher();
@@ -2418,12 +2415,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
                 subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
                     Dispatcher dispatcher = sub.getDispatcher();
-                    // If the topic-level policy already exists, the namespace-level policy cannot override
-                    // the topic-level policy.
-                    if (dispatcher != null && (!topicPolicies.isPresent() || !topicPolicies.get()
-                            .isSubscriptionDispatchRateSet())) {
-                        dispatcher.getRateLimiter()
-                                .ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
+                    if (dispatcher != null) {
+                        dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
                     }
                 }));
             });