You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 06:25:12 UTC

[pulsar] branch branch-2.10 updated: [Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (#16778)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a88e96ec9c4 [Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (#16778)
a88e96ec9c4 is described below

commit a88e96ec9c466620bac478ed4fc5eff4c88725ff
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Aug 8 14:25:05 2022 +0800

    [Branch-2.10][Cherry-pick] tidy update subscriptions dispatcher rate-limiter (#16778)
---
 .../pulsar/broker/service/AbstractTopic.java       |  58 +++++++++
 .../pulsar/broker/service/BrokerService.java       |  38 ++++++
 .../org/apache/pulsar/broker/service/Topic.java    |   5 +
 .../service/persistent/DispatchRateLimiter.java    |   3 +
 .../broker/service/persistent/PersistentTopic.java | 116 +++++++++---------
 .../service/persistent/SubscribeRateLimiter.java   |  76 +-----------
 .../PrecisTopicPublishRateThrottleTest.java        |  42 +++++++
 .../pulsar/broker/service/ServerCnxTest.java       |   5 +
 .../pulsar/broker/service/SubscribeRateTest.java   | 131 +++++++++++++++++++++
 .../pulsar/common/policies/data/SubscribeRate.java |  10 ++
 .../policies/data/HierarchyTopicPolicies.java      |   4 +
 11 files changed, 354 insertions(+), 134 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 917dec0b423..f9efa8ea186 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -65,6 +65,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -157,6 +158,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable();
     }
 
+    public SubscribeRate getSubscribeRate() {
+        return this.topicPolicies.getSubscribeRate().get();
+    }
+
     public DispatchRateImpl getSubscriptionDispatchRate() {
         return this.topicPolicies.getSubscriptionDispatchRate().get();
     }
@@ -169,6 +174,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         return this.topicPolicies.getReplicatorDispatchRate().get();
     }
 
+    public DispatchRateImpl getDispatchRate() {
+        return this.topicPolicies.getDispatchRate().get();
+    }
+
     private SchemaCompatibilityStrategy formatSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
         return strategy == SchemaCompatibilityStrategy.UNDEFINED ? null : strategy;
     }
@@ -204,8 +213,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
         topicPolicies.getReplicatorDispatchRate().updateTopicValue(normalize(data.getReplicatorDispatchRate()));
         topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
+        topicPolicies.getSubscribeRate().updateTopicValue(SubscribeRate.normalize(data.getSubscribeRate()));
         topicPolicies.getSubscriptionDispatchRate().updateTopicValue(normalize(data.getSubscriptionDispatchRate()));
         topicPolicies.getCompactionThreshold().updateTopicValue(data.getCompactionThreshold());
+        topicPolicies.getDispatchRate().updateTopicValue(normalize(data.getDispatchRate()));
     }
 
     protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
@@ -247,9 +258,24 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
                 type -> this.topicPolicies.getBackLogQuotaMap().get(type)
                         .updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type)));
+        updateNamespaceSubscribeRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
         updateNamespaceSubscriptionDispatchRate(namespacePolicies,
             brokerService.getPulsar().getConfig().getClusterName());
         updateSchemaCompatibilityStrategyNamespaceValue(namespacePolicies);
+        updateNamespaceDispatchRate(namespacePolicies, brokerService.getPulsar().getConfig().getClusterName());
+    }
+
+    private void updateNamespaceDispatchRate(Policies namespacePolicies, String cluster) {
+        DispatchRateImpl dispatchRate = namespacePolicies.topicDispatchRate.get(cluster);
+        if (dispatchRate == null) {
+            dispatchRate = namespacePolicies.clusterDispatchRate.get(cluster);
+        }
+        topicPolicies.getDispatchRate().updateNamespaceValue(normalize(dispatchRate));
+    }
+
+    private void updateNamespaceSubscribeRate(Policies namespacePolicies, String cluster) {
+        topicPolicies.getSubscribeRate()
+            .updateNamespaceValue(SubscribeRate.normalize(namespacePolicies.clusterSubscribeRate.get(cluster)));
     }
 
     private void updateNamespaceSubscriptionDispatchRate(Policies namespacePolicies, String cluster) {
@@ -335,9 +361,26 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         if (isSystemTopic()) {
             schemaCompatibilityStrategy = config.getSystemTopicSchemaCompatibilityStrategy();
         }
+        topicPolicies.getSubscribeRate().updateBrokerValue(subscribeRateInBroker(config));
         topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(subscriptionDispatchRateInBroker(config));
         topicPolicies.getSchemaCompatibilityStrategy()
                 .updateBrokerValue(formatSchemaCompatibilityStrategy(schemaCompatibilityStrategy));
+        topicPolicies.getDispatchRate().updateBrokerValue(dispatchRateInBroker(config));
+    }
+
+    private DispatchRateImpl dispatchRateInBroker(ServiceConfiguration config) {
+        return DispatchRateImpl.builder()
+                .dispatchThrottlingRateInMsg(config.getDispatchThrottlingRatePerTopicInMsg())
+                .dispatchThrottlingRateInByte(config.getDispatchThrottlingRatePerTopicInByte())
+                .ratePeriodInSecond(1)
+                .build();
+    }
+
+    private SubscribeRate subscribeRateInBroker(ServiceConfiguration config) {
+        return new SubscribeRate(
+            config.getSubscribeThrottlingRatePerConsumer(),
+            config.getSubscribeRatePeriodPerConsumerInSecond()
+        );
     }
 
     private DispatchRateImpl subscriptionDispatchRateInBroker(ServiceConfiguration config) {
@@ -1173,4 +1216,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
         topicPolicies.getReplicatorDispatchRate().updateBrokerValue(
             replicatorDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
     }
+
+    public void updateBrokerDispatchRate() {
+        topicPolicies.getDispatchRate().updateBrokerValue(
+            dispatchRateInBroker(brokerService.pulsar().getConfiguration()));
+    }
+
+    public void updateBrokerPublishRate() {
+        topicPolicies.getPublishRate().updateBrokerValue(
+            publishRateInBroker(brokerService.pulsar().getConfiguration()));
+    }
+
+    public void updateBrokerSubscribeRate() {
+        topicPolicies.getSubscribeRate().updateBrokerValue(
+            subscribeRateInBroker(brokerService.pulsar().getConfiguration()));
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4b07087759e..dab1c349aed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2216,6 +2216,21 @@ public class BrokerService implements Closeable {
                 (publisherThrottlingTickTimeMillis) -> {
                     setupBrokerPublishRateLimiterMonitor();
                 });
+
+        // add listener to update topic publish-rate dynamic config
+        registerConfigurationListener("maxPublishRatePerTopicInMessages",
+            maxPublishRatePerTopicInMessages -> updateMaxPublishRatePerTopicInMessages()
+        );
+        registerConfigurationListener("maxPublishRatePerTopicInBytes",
+            maxPublishRatePerTopicInMessages -> updateMaxPublishRatePerTopicInMessages()
+        );
+
+        // add listener to update subscribe-rate dynamic config
+        registerConfigurationListener("subscribeThrottlingRatePerConsumer",
+            subscribeThrottlingRatePerConsumer -> updateSubscribeRate());
+        registerConfigurationListener("subscribeRatePeriodPerConsumerInSecond",
+            subscribeRatePeriodPerConsumerInSecond -> updateSubscribeRate());
+
         // add listener to notify broker publish-rate dynamic config
         registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate",
                 (brokerPublisherThrottlingMaxMessageRate) ->
@@ -2252,6 +2267,26 @@ public class BrokerService implements Closeable {
         }
     }
 
+    private void updateMaxPublishRatePerTopicInMessages() {
+        this.pulsar().getExecutor().submit(() ->
+            forEachTopic(topic -> {
+                if (topic instanceof AbstractTopic) {
+                    ((AbstractTopic) topic).updateBrokerPublishRate();
+                    ((AbstractTopic) topic).updatePublishDispatcher();
+                }
+            }));
+    }
+
+    private void updateSubscribeRate() {
+        this.pulsar().getExecutor().submit(() ->
+            forEachTopic(topic -> {
+                if (topic instanceof PersistentTopic) {
+                    ((PersistentTopic) topic).updateBrokerSubscribeRate();
+                    ((PersistentTopic) topic).updateSubscribeRateLimiter();
+                }
+            }));
+    }
+
     private void updateBrokerPublisherThrottlingMaxRate() {
         int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
         long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
@@ -2284,6 +2319,9 @@ public class BrokerService implements Closeable {
         this.pulsar().getExecutor().execute(() -> {
             // update message-rate for each topic
             forEachTopic(topic -> {
+                if (topic instanceof AbstractTopic) {
+                    ((AbstractTopic) topic).updateBrokerDispatchRate();
+                }
                 if (topic.getDispatchRateLimiter().isPresent()) {
                     topic.getDispatchRateLimiter().get().updateDispatchRate();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index fc1eadf47e4..e2ffb41390a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.api.MessageId;
@@ -285,6 +286,10 @@ public interface Topic {
         return Optional.empty();
     }
 
+    default Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
+        return Optional.empty();
+    }
+
     default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
         return Optional.empty();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 3247494577a..cd7f5e9ea64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -170,6 +170,9 @@ public class DispatchRateLimiter {
      */
     public void updateDispatchRate() {
         switch (type) {
+            case TOPIC:
+                updateDispatchRate(topic.getDispatchRate());
+                return;
             case SUBSCRIPTION:
                 updateDispatchRate(topic.getSubscriptionDispatchRate());
                 return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5be91078b14..51c27bfd8f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter.isSubscribeRateEnabled;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
@@ -135,6 +136,7 @@ import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerI
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -260,7 +262,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 .build();
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
-        initializeRateLimiterIfNeeded(Optional.empty());
         registerTopicPolicyListener();
 
         this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
@@ -316,6 +317,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         isEncryptionRequired = false;
                         updatePublishDispatcher();
                         updateResourceGroupLimiter(optPolicies);
+                        initializeDispatchRateLimiterIfNeeded();
+                        updateSubscribeRateLimiter();
                         return;
                     }
 
@@ -323,6 +326,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                     this.updateTopicPolicyByNamespacePolicy(policies);
 
+                    initializeDispatchRateLimiterIfNeeded();
+
+                    updateSubscribeRateLimiter();
+
                     updatePublishDispatcher();
 
                     updateResourceGroupLimiter(optPolicies);
@@ -367,27 +374,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
-    private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
+    private void initializeDispatchRateLimiterIfNeeded() {
         synchronized (dispatchRateLimiterLock) {
             // dispatch rate limiter for topic
-            if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
-                    .isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
+            if (!dispatchRateLimiter.isPresent()
+                && DispatchRateLimiter.isDispatchRateEnabled(topicPolicies.getDispatchRate().get())) {
                 this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
             }
-            boolean isDispatchRateNeeded = SubscribeRateLimiter.isDispatchRateNeeded(brokerService, policies, topic);
-            if (!subscribeRateLimiter.isPresent() && isDispatchRateNeeded) {
-                this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
-            } else if (!isDispatchRateNeeded) {
-                this.subscribeRateLimiter = Optional.empty();
-            }
-
-            // dispatch rate limiter for each subscription
-            subscriptions.forEach((name, subscription) -> {
-                Dispatcher dispatcher = subscription.getDispatcher();
-                if (dispatcher != null) {
-                    dispatcher.initializeDispatchRateLimiterIfNeeded();
-                }
-            });
         }
     }
 
@@ -434,6 +427,24 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
+    public void updateSubscribeRateLimiter() {
+        SubscribeRate subscribeRate = getSubscribeRate();
+        synchronized (subscribeRateLimiter) {
+            if (isSubscribeRateEnabled(subscribeRate)) {
+                if (subscribeRateLimiter.isPresent()) {
+                    this.subscribeRateLimiter.get().onSubscribeRateUpdate(subscribeRate);
+                } else {
+                    this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
+                }
+            } else {
+                if (subscribeRateLimiter.isPresent()) {
+                    subscribeRateLimiter.get().close();
+                    subscribeRateLimiter = Optional.empty();
+                }
+            }
+        }
+    }
+
     private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
         if (brokerService.isBrokerEntryMetadataEnabled()) {
             ledger.asyncAddEntry(headersAndPayload,
@@ -2375,10 +2386,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         schemaValidationEnforced = data.schema_validation_enforced;
 
-        //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
-        Optional<TopicPolicies> topicPolicies = getTopicPolicies();
+        initializeDispatchRateLimiterIfNeeded();
 
-        initializeRateLimiterIfNeeded(Optional.ofNullable(data));
+        updateSubscribeRateLimiter();
 
         updatePublishDispatcher();
 
@@ -2389,36 +2399,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
 
         return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
-            List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
-            subscriptions.forEach((subName, sub) -> {
-                List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
-                sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
-                subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
-                    Dispatcher dispatcher = sub.getDispatcher();
-                    // If the topic-level policy already exists, the namespace-level policy cannot override
-                    // the topic-level policy.
-                    if (dispatcher != null && (!topicPolicies.isPresent() || !topicPolicies.get()
-                            .isSubscriptionDispatchRateSet())) {
-                        dispatcher.getRateLimiter()
-                                .ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
-                    }
-                }));
-            });
-
-            return FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> {
+            return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> {
                 replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
                 checkMessageExpiry();
                 CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
                 CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
                 CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
                 // update rate-limiter if policies updated
-                if (this.dispatchRateLimiter.isPresent()) {
-                    if (!topicPolicies.isPresent() || !topicPolicies.get().isDispatchRateSet()) {
-                        dispatchRateLimiter.get().onPoliciesUpdate(data);
-                    }
-                }
+                dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
                 if (this.subscribeRateLimiter.isPresent()) {
-                    subscribeRateLimiter.get().onPoliciesUpdate(data);
+                    subscribeRateLimiter.get().onSubscribeRateUpdate(getSubscribeRate());
                 }
 
                 return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture,
@@ -3039,30 +3029,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         Optional<Policies> namespacePolicies = getNamespacePolicies();
         initializeTopicDispatchRateLimiterIfNeeded(policies);
 
-        dispatchRateLimiter.ifPresent(limiter -> {
-            if (policies.isDispatchRateSet()) {
-                dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
-            } else {
-                dispatchRateLimiter.get().updateDispatchRate();
-            }
-        });
-
-        List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
-        subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
-            consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> {
-                Dispatcher dispatcher = sub.getDispatcher();
-                if (dispatcher != null) {
-                    dispatcher.updateRateLimiter();
-                }
-            }));
-        }));
+        dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
 
-        FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
+        updateSubscriptionsDispatcherRateLimiter().thenRun(() -> {
             updatePublishDispatcher();
             initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
             if (this.subscribeRateLimiter.isPresent()) {
                 subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
-                        subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
+                        subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate()));
             }
             replicators.forEach((name, replicator) -> replicator.updateRateLimiter());
             checkMessageExpiry();
@@ -3087,6 +3061,21 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         return DispatchRateLimiter.getPolicies(brokerService, topic);
     }
 
+    private CompletableFuture<Void> updateSubscriptionsDispatcherRateLimiter() {
+        List<CompletableFuture<Void>> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size());
+        subscriptions.forEach((subName, sub) -> {
+            List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size());
+            sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
+            subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
+                Dispatcher dispatcher = sub.getDispatcher();
+                if (dispatcher != null) {
+                    dispatcher.updateRateLimiter();
+                }
+            }));
+        });
+        return FutureUtil.waitForAll(subscriptionCheckFutures);
+    }
+
     private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
         synchronized (dispatchRateLimiterLock) {
             if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
@@ -3106,6 +3095,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
             } else if (!policies.get().isSubscribeRateSet()
                     || policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
+                subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
                 this.subscribeRateLimiter = Optional.empty();
             }
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
index 93707b4977b..89af6f6be88 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java
@@ -27,12 +27,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
-import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,22 +49,7 @@ public class SubscribeRateLimiter {
         subscribeRateLimiter = new ConcurrentHashMap<>();
         this.executorService = brokerService.pulsar().getExecutor();
         // get subscribeRate from topic level policies
-        this.subscribeRate = topic.getTopicPolicies()
-                .map(TopicPolicies::getSubscribeRate)
-                .orElse(null);
-
-        // subscribeRate of topic level policies not set, get from zookeeper
-        if (this.subscribeRate == null) {
-            this.subscribeRate = getPoliciesSubscribeRate();
-        }
-
-        // get subscribeRate from broker.conf
-        if (this.subscribeRate == null) {
-            this.subscribeRate = new SubscribeRate(brokerService.pulsar()
-                    .getConfiguration().getSubscribeThrottlingRatePerConsumer(),
-                    brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond());
-
-        }
+        this.subscribeRate = topic.getSubscribeRate();
         if (isSubscribeRateEnabled(this.subscribeRate)) {
             resetTask = createTask();
             log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
@@ -157,40 +139,9 @@ public class SubscribeRateLimiter {
         }
     }
 
-    public void onPoliciesUpdate(Policies data) {
-        // if subscribe rate is set on topic policy, skip subscribe rate update
-        SubscribeRate subscribeRate = brokerService.getTopicPolicies(TopicName.get(topicName))
-                .map(TopicPolicies::getSubscribeRate)
-                .orElse(null);
-        if (subscribeRate != null) {
-            return;
-        }
-
-        String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-
-        subscribeRate = data.clusterSubscribeRate.get(cluster);
-
-        onSubscribeRateUpdate(subscribeRate);
-
-    }
-
     public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
-        final SubscribeRate namespacePolicySubscribeRate = getPoliciesSubscribeRate();
-        final SubscribeRate newSubscribeRate = new SubscribeRate(
-                brokerService.pulsar().getConfiguration().getSubscribeThrottlingRatePerConsumer(),
-                brokerService.pulsar().getConfiguration().getSubscribeRatePeriodPerConsumerInSecond()
-                );
-
-        // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
-        // cluster-throttling rate
-        // if topic policy-throttling rate is disabled
-        if (!isSubscribeRateEnabled(subscribeRate) && isSubscribeRateEnabled(namespacePolicySubscribeRate)) {
-            subscribeRate = namespacePolicySubscribeRate;
-        }
-
-        if (!isSubscribeRateEnabled(subscribeRate) && !isSubscribeRateEnabled(namespacePolicySubscribeRate)
-                && isSubscribeRateEnabled(newSubscribeRate)) {
-            subscribeRate = newSubscribeRate;
+        if (this.subscribeRate.equals(subscribeRate)) {
+            return;
         }
         this.subscribeRate = subscribeRate;
         stopResetTask();
@@ -216,23 +167,6 @@ public class SubscribeRateLimiter {
         return getPoliciesSubscribeRate(brokerService, topicName);
     }
 
-    public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
-            String topicName) {
-        ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
-        policies = policies.isPresent() ? policies : DispatchRateLimiter.getPolicies(brokerService, topicName);
-        return isDispatchRateNeeded(serviceConfig, policies, topicName);
-    }
-
-    private static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
-            final Optional<Policies> policies, final String topicName) {
-        SubscribeRate subscribeRate = getPoliciesSubscribeRate(serviceConfig.getClusterName(), policies, topicName);
-        if (subscribeRate == null) {
-            return serviceConfig.getSubscribeThrottlingRatePerConsumer() > 0
-                    && serviceConfig.getSubscribeRatePeriodPerConsumerInSecond() > 0;
-        }
-        return true;
-    }
-
     public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) {
         final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
         final Optional<Policies> policies = DispatchRateLimiter.getPolicies(brokerService, topicName);
@@ -262,8 +196,8 @@ public class SubscribeRateLimiter {
                 != null ? subscribeRateLimiter.get(consumerIdentifier).getRate() : -1;
     }
 
-    private static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
-        return subscribeRate != null && (subscribeRate.subscribeThrottlingRatePerConsumer > 0);
+    public static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
+        return subscribeRate.subscribeThrottlingRatePerConsumer > 0 && subscribeRate.ratePeriodInSecond > 0;
     }
 
     public void close() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
index 4a40bc162de..82356055031 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisTopicPublishRateThrottleTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service;
 
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.policies.data.PublishRate;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -140,4 +141,45 @@ public class PrecisTopicPublishRateThrottleTest extends BrokerTestBase{
         Assert.assertNotNull(messageId);
         super.internalCleanup();
     }
+
+    @Test
+    public void testBrokerLevelPublishRateDynamicUpdate() throws Exception{
+        conf.setPreciseTopicPublishRateLimiterEnable(true);
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+        super.baseSetup();
+        final String topic = "persistent://prop/ns-abc/testMultiLevelPublishRate";
+        org.apache.pulsar.client.api.Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("producer-name")
+            .create();
+
+        final int rateInMsg = 10;
+        final long rateInByte = 20;
+
+        // maxPublishRatePerTopicInMessages
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "" + rateInMsg);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInMessages"),
+                    "" + rateInMsg));
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        PrecisPublishLimiter limiter = ((PrecisPublishLimiter) ((AbstractTopic) topicRef).topicPublishRateLimiter);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.publishMaxMessageRate, rateInMsg));
+        Assert.assertEquals(limiter.publishMaxByteRate, 0);
+
+        // maxPublishRatePerTopicInBytes
+        admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "" + rateInByte);
+        Awaitility.await()
+            .untilAsserted(() ->
+                Assert.assertEquals(admin.brokers().getAllDynamicConfigurations().get("maxPublishRatePerTopicInBytes"),
+                    "" + rateInByte));
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.publishMaxByteRate, rateInByte));
+        Assert.assertEquals(limiter.publishMaxMessageRate, rateInMsg);
+
+        producer.close();
+        super.internalCleanup();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 12a4f0db18d..3a38994fa25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -1393,6 +1393,7 @@ public class ServerCnxTest {
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
         policies.topicDispatchRate = Maps.newHashMap();
+        policies.clusterSubscribeRate = Maps.newHashMap();
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
         policies.clusterDispatchRate = Maps.newHashMap();
@@ -1428,6 +1429,7 @@ public class ServerCnxTest {
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
         policies.topicDispatchRate = Maps.newHashMap();
+        policies.clusterSubscribeRate = Maps.newHashMap();
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
         policies.clusterDispatchRate = Maps.newHashMap();
@@ -1468,6 +1470,7 @@ public class ServerCnxTest {
         // Namespace policy doesn't require encryption
         policies.encryption_required = false;
         policies.topicDispatchRate = Maps.newHashMap();
+        policies.clusterSubscribeRate = Maps.newHashMap();
         // add `clusterDispatchRate` otherwise there will be a NPE
         policies.clusterDispatchRate = Maps.newHashMap();
         // add `clusterDispatchRate` otherwise there will be a NPE
@@ -1504,6 +1507,7 @@ public class ServerCnxTest {
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
         policies.topicDispatchRate = Maps.newHashMap();
+        policies.clusterSubscribeRate = Maps.newHashMap();
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
         policies.clusterDispatchRate = Maps.newHashMap();
@@ -1547,6 +1551,7 @@ public class ServerCnxTest {
         Policies policies = mock(Policies.class);
         policies.encryption_required = true;
         policies.topicDispatchRate = Maps.newHashMap();
+        policies.clusterSubscribeRate = Maps.newHashMap();
         // add `clusterDispatchRate` otherwise there will be a NPE
         // `org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.getPoliciesDispatchRate`
         policies.clusterDispatchRate = Maps.newHashMap();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
new file mode 100644
index 00000000000..547fbe354f1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscribeRateTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Optional;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.persistent.SubscribeRateLimiter;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class SubscribeRateTest extends BrokerTestBase {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setMaxPendingPublishRequestsPerConnection(0);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testBrokerLevelSubscribeRateDynamicUpdate() throws Exception {
+        final String topic = "persistent://prop/ns-abc/testBrokerLevelSubscribeRateDynamicUpdate";
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("producer-name")
+            .create();
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+        final int ratePerConsumer = 10;
+        final int ratePeriod = 60;
+
+        String defaultRatePerConsumer = admin.brokers().getRuntimeConfigurations().get("subscribeThrottlingRatePerConsumer");
+        String defaultRatePeriod = admin.brokers().getRuntimeConfigurations().get("subscribeRatePeriodPerConsumerInSecond");
+        Assert.assertNotNull(defaultRatePerConsumer);
+        Assert.assertNotNull(defaultRatePeriod);
+        Assert.assertNotEquals(ratePerConsumer, Integer.parseInt(defaultRatePerConsumer));
+        Assert.assertNotEquals(ratePeriod, Integer.parseInt(defaultRatePeriod));
+
+        // subscribeThrottlingRatePerConsumer
+        admin.brokers().updateDynamicConfiguration("subscribeThrottlingRatePerConsumer", ratePerConsumer + "");
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(topicRef.getSubscribeRateLimiter().isPresent()));
+        SubscribeRateLimiter limiter = topicRef.getSubscribeRateLimiter().get();
+        Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer, ratePerConsumer);
+        Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, 30);
+
+        // subscribeRatePeriodPerConsumerInSecond
+        admin.brokers().updateDynamicConfiguration("subscribeRatePeriodPerConsumerInSecond", ratePeriod + "");
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(limiter.getSubscribeRate().ratePeriodInSecond, ratePeriod));
+        Assert.assertEquals(limiter.getSubscribeRate().subscribeThrottlingRatePerConsumer, ratePerConsumer);
+
+        producer.close();
+    }
+
+    @Test
+    public void testUpdateSubscribeRateLimiter() throws Exception {
+
+        final String topic = "persistent://prop/ns-abc/testUpdateSubscribeRateLimiter";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+            .topic(topic)
+            .producerName("producer-name")
+            .create();
+
+        Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
+        Assert.assertNotNull(topicRef);
+        Assert.assertTrue(topicRef instanceof PersistentTopic);
+        Assert.assertFalse(topicRef.getSubscribeRateLimiter().isPresent());
+
+        // init
+        PersistentTopic persistentTopic = spy(((PersistentTopic) topicRef));
+        when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(10, 60));
+        persistentTopic.updateSubscribeRateLimiter();
+
+        Optional<SubscribeRateLimiter> limiter1 = persistentTopic.getSubscribeRateLimiter();
+        Assert.assertTrue(limiter1.isPresent());
+        Assert.assertEquals(limiter1.get().getSubscribeRate(), new SubscribeRate(10, 60));
+
+        // update
+        when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(20, 120));
+        persistentTopic.updateSubscribeRateLimiter();
+
+        Optional<SubscribeRateLimiter> limiter2 = persistentTopic.getSubscribeRateLimiter();
+        Assert.assertTrue(limiter2.isPresent());
+        Assert.assertEquals(limiter2.get().getSubscribeRate(), new SubscribeRate(20, 120));
+
+        Assert.assertSame(limiter1, limiter2);
+
+        // disable
+        when(persistentTopic.getSubscribeRate()).thenReturn(new SubscribeRate(0, 0));
+        persistentTopic.updateSubscribeRateLimiter();
+
+        Optional<SubscribeRateLimiter> limiter3 = persistentTopic.getSubscribeRateLimiter();
+        Assert.assertFalse(limiter3.isPresent());
+    }
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
index dc553c69267..ea641b44b2a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscribeRate.java
@@ -38,6 +38,16 @@ public class SubscribeRate {
         this.ratePeriodInSecond = ratePeriodInSecond;
     }
 
+    public static SubscribeRate normalize(SubscribeRate subscribeRate) {
+        if (subscribeRate != null
+            && subscribeRate.subscribeThrottlingRatePerConsumer > 0
+            && subscribeRate.ratePeriodInSecond > 0) {
+            return subscribeRate;
+        } else {
+            return null;
+        }
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(subscribeThrottlingRatePerConsumer, ratePeriodInSecond);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 917b9753cc3..0532744bec3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -53,8 +53,10 @@ public class HierarchyTopicPolicies {
     final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
     final PolicyHierarchyValue<DispatchRateImpl> replicatorDispatchRate;
     final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
+    final PolicyHierarchyValue<SubscribeRate> subscribeRate;
     final PolicyHierarchyValue<DispatchRateImpl> subscriptionDispatchRate;
     final PolicyHierarchyValue<SchemaCompatibilityStrategy> schemaCompatibilityStrategy;
+    final PolicyHierarchyValue<DispatchRateImpl> dispatchRate;
 
     public HierarchyTopicPolicies() {
         replicationClusters = new PolicyHierarchyValue<>();
@@ -80,7 +82,9 @@ public class HierarchyTopicPolicies {
         delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
         replicatorDispatchRate = new PolicyHierarchyValue<>();
         compactionThreshold = new PolicyHierarchyValue<>();
+        subscribeRate = new PolicyHierarchyValue<>();
         subscriptionDispatchRate = new PolicyHierarchyValue<>();
         schemaCompatibilityStrategy = new PolicyHierarchyValue<>();
+        dispatchRate = new PolicyHierarchyValue<>();
     }
 }