You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/11 13:23:30 UTC
[pulsar] branch master updated: Use HierarchyTopicPolicies to update topic-level subscribeRate (#15498)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f87cbedf369 Use HierarchyTopicPolicies to update topic-level subscribeRate (#15498)
f87cbedf369 is described below
commit f87cbedf36994754e897280f21fa8b78b81eba6b
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed May 11 21:23:22 2022 +0800
Use HierarchyTopicPolicies to update topic-level subscribeRate (#15498)
---
.../broker/service/persistent/PersistentTopic.java | 19 +------------------
1 file changed, 1 insertion(+), 18 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 73ff5e292ae..4074ffd7fa4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -3046,7 +3046,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
updatePublishDispatcher();
- initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
+ updateSubscribeRateLimiter();
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(getSubscribeRate()));
@@ -3079,23 +3079,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
}
- private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
- if (!policies.isPresent()) {
- return;
- }
- synchronized (subscribeRateLimiter) {
- if (!subscribeRateLimiter.isPresent()
- && policies.get().getSubscribeRate() != null
- && policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer > 0) {
- this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
- } else if (!policies.get().isSubscribeRateSet()
- || policies.get().getSubscribeRate().subscribeThrottlingRatePerConsumer <= 0) {
- subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
- this.subscribeRateLimiter = Optional.empty();
- }
- }
- }
-
protected CompletableFuture<Void> initTopicPolicy() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled()
&& brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {