You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:38 UTC
[pulsar] 08/17: Sync topicPublishRateLimiter update (#15599)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1e026944556a4169047d1c11d538e229ea934923
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sat May 28 10:53:34 2022 +0800
Sync topicPublishRateLimiter update (#15599)
(cherry picked from commit 51e727f25375fae1fc003370f0b7f113160f7529)
---
.../pulsar/broker/service/AbstractTopic.java | 41 ++++++++++++----------
1 file changed, 22 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 04fb3ca952c..4ed89908651 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -109,6 +109,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected volatile boolean schemaValidationEnforced = false;
protected volatile PublishRateLimiter topicPublishRateLimiter;
+ private final Object topicPublishRateLimiterLock = new Object();
protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
@@ -1079,32 +1080,34 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
* update topic publish dispatcher for this topic.
*/
public void updatePublishDispatcher() {
- PublishRate publishRate = topicPolicies.getPublishRate().get();
- if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
- log.info("Enabling publish rate limiting {} ", publishRate);
- if (!preciseTopicPublishRateLimitingEnable) {
- this.brokerService.setupTopicPublishRateLimiterMonitor();
- }
+ synchronized (topicPublishRateLimiterLock) {
+ PublishRate publishRate = topicPolicies.getPublishRate().get();
+ if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
+ log.info("Enabling publish rate limiting {} ", publishRate);
+ if (!preciseTopicPublishRateLimitingEnable) {
+ this.brokerService.setupTopicPublishRateLimiterMonitor();
+ }
- if (this.topicPublishRateLimiter == null
+ if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
- // create new rateLimiter if rate-limiter is disabled
- if (preciseTopicPublishRateLimitingEnable) {
- this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
+ // create new rateLimiter if rate-limiter is disabled
+ if (preciseTopicPublishRateLimitingEnable) {
+ this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
+ } else {
+ this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+ }
} else {
- this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+ this.topicPublishRateLimiter.update(publishRate);
}
} else {
- this.topicPublishRateLimiter.update(publishRate);
- }
- } else {
- log.info("Disabling publish throttling for {}", this.topic);
- if (topicPublishRateLimiter != null) {
- topicPublishRateLimiter.close();
+ log.info("Disabling publish throttling for {}", this.topic);
+ if (topicPublishRateLimiter != null) {
+ topicPublishRateLimiter.close();
+ }
+ this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
+ enableProducerReadForPublishRateLimiting();
}
- this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
- enableProducerReadForPublishRateLimiting();
}
}