You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:38 UTC

[pulsar] 08/17: Sync topicPublishRateLimiter update (#15599)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1e026944556a4169047d1c11d538e229ea934923
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sat May 28 10:53:34 2022 +0800

    Sync topicPublishRateLimiter update (#15599)
    
    (cherry picked from commit 51e727f25375fae1fc003370f0b7f113160f7529)
---
 .../pulsar/broker/service/AbstractTopic.java       | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 04fb3ca952c..4ed89908651 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -109,6 +109,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
     protected volatile boolean schemaValidationEnforced = false;
 
     protected volatile PublishRateLimiter topicPublishRateLimiter;
+    private final Object topicPublishRateLimiterLock = new Object();
 
     protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
 
@@ -1079,32 +1080,34 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
      * update topic publish dispatcher for this topic.
      */
     public void updatePublishDispatcher() {
-        PublishRate publishRate = topicPolicies.getPublishRate().get();
-        if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
-            log.info("Enabling publish rate limiting {} ", publishRate);
-            if (!preciseTopicPublishRateLimitingEnable) {
-                this.brokerService.setupTopicPublishRateLimiterMonitor();
-            }
+        synchronized (topicPublishRateLimiterLock) {
+            PublishRate publishRate = topicPolicies.getPublishRate().get();
+            if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
+                log.info("Enabling publish rate limiting {} ", publishRate);
+                if (!preciseTopicPublishRateLimitingEnable) {
+                    this.brokerService.setupTopicPublishRateLimiterMonitor();
+                }
 
-            if (this.topicPublishRateLimiter == null
+                if (this.topicPublishRateLimiter == null
                     || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
-                // create new rateLimiter if rate-limiter is disabled
-                if (preciseTopicPublishRateLimitingEnable) {
-                    this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
+                    // create new rateLimiter if rate-limiter is disabled
+                    if (preciseTopicPublishRateLimitingEnable) {
+                        this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
                             () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
+                    } else {
+                        this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+                    }
                 } else {
-                    this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+                    this.topicPublishRateLimiter.update(publishRate);
                 }
             } else {
-                this.topicPublishRateLimiter.update(publishRate);
-            }
-        } else {
-            log.info("Disabling publish throttling for {}", this.topic);
-            if (topicPublishRateLimiter != null) {
-                topicPublishRateLimiter.close();
+                log.info("Disabling publish throttling for {}", this.topic);
+                if (topicPublishRateLimiter != null) {
+                    topicPublishRateLimiter.close();
+                }
+                this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
+                enableProducerReadForPublishRateLimiting();
             }
-            this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
-            enableProducerReadForPublishRateLimiting();
         }
     }