You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/05/28 02:53:43 UTC
[pulsar] branch master updated: Sync topicPublishRateLimiter update (#15599)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 51e727f2537 Sync topicPublishRateLimiter update (#15599)
51e727f2537 is described below
commit 51e727f25375fae1fc003370f0b7f113160f7529
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Sat May 28 10:53:34 2022 +0800
Sync topicPublishRateLimiter update (#15599)
---
.../pulsar/broker/service/AbstractTopic.java | 41 ++++++++++++----------
1 file changed, 22 insertions(+), 19 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index edc7bc89647..19116d60566 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -110,6 +110,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected volatile Boolean isAllowAutoUpdateSchema;
protected volatile PublishRateLimiter topicPublishRateLimiter;
+ private final Object topicPublishRateLimiterLock = new Object();
protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
@@ -1192,32 +1193,34 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
* update topic publish dispatcher for this topic.
*/
public void updatePublishDispatcher() {
- PublishRate publishRate = topicPolicies.getPublishRate().get();
- if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
- log.info("Enabling publish rate limiting {} ", publishRate);
- if (!preciseTopicPublishRateLimitingEnable) {
- this.brokerService.setupTopicPublishRateLimiterMonitor();
- }
+ synchronized (topicPublishRateLimiterLock) {
+ PublishRate publishRate = topicPolicies.getPublishRate().get();
+ if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
+ log.info("Enabling publish rate limiting {} ", publishRate);
+ if (!preciseTopicPublishRateLimitingEnable) {
+ this.brokerService.setupTopicPublishRateLimiterMonitor();
+ }
- if (this.topicPublishRateLimiter == null
+ if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
- // create new rateLimiter if rate-limiter is disabled
- if (preciseTopicPublishRateLimitingEnable) {
- this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
+ // create new rateLimiter if rate-limiter is disabled
+ if (preciseTopicPublishRateLimitingEnable) {
+ this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
+ } else {
+ this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+ }
} else {
- this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
+ this.topicPublishRateLimiter.update(publishRate);
}
} else {
- this.topicPublishRateLimiter.update(publishRate);
- }
- } else {
- log.info("Disabling publish throttling for {}", this.topic);
- if (topicPublishRateLimiter != null) {
- topicPublishRateLimiter.close();
+ log.info("Disabling publish throttling for {}", this.topic);
+ if (topicPublishRateLimiter != null) {
+ topicPublishRateLimiter.close();
+ }
+ this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
+ enableProducerReadForPublishRateLimiting();
}
- this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
- enableProducerReadForPublishRateLimiting();
}
}