You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/11 13:23:30 UTC

[pulsar] branch master updated: Use HierarchyTopicPolicies to update topic-level subscribeRate (#15498)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f87cbedf369 Use HierarchyTopicPolicies to update topic-level subscribeRate (#15498)
f87cbedf369 is described below

commit f87cbedf36994754e897280f21fa8b78b81eba6b
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed May 11 21:23:22 2022 +0800

    Use HierarchyTopicPolicies to update topic-level subscribeRate (#15498)
---
 .../broker/service/persistent/PersistentTopic.java    | 19 +------------------
 1 file changed, 1 insertion(+), 18 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 73ff5e292ae..4074ffd7fa4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3046,7 +3046,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
         FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
             updatePublishDispatcher();
-            initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
+            updateSubscribeRateLimiter();
             if (this.subscribeRateLimiter.isPresent()) {
                 subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
                         subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate()));
@@ -3079,23 +3079,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         }
     }
 
-    private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
-        if (!policies.isPresent()) {
-            return;
-        }
-        synchronized (subscribeRateLimiter) {
-            if (!subscribeRateLimiter.isPresent()
-                    && policies.get().getSubscribeRate() != null
-                    && policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer > 0) {
-                this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
-            } else if (!policies.get().isSubscribeRateSet()
-                    || policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
-                subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
-                this.subscribeRateLimiter = Optional.empty();
-            }
-        }
-    }
-
     protected CompletableFuture<Void> initTopicPolicy() {
         if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
                 && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {