You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/26 03:58:12 UTC
[pulsar] branch master updated: Optimized namespace-level dispathcRateLimiter update (#15315)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74eae1a729c Optimized namespace-level dispathcRateLimiter update (#15315)
74eae1a729c is described below
commit 74eae1a729c33c9fead9d54594bdee5fac8ab153
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Tue Apr 26 11:58:06 2022 +0800
Optimized namespace-level dispathcRateLimiter update (#15315)
---
.../pulsar/broker/service/persistent/PersistentTopic.java | 11 ++---------
1 file changed, 2 insertions(+), 9 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index eed8ba0d3b8..bfe0b644109 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2398,9 +2398,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
schemaValidationEnforced = data.schema_validation_enforced;
- //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
- Optional<TopicPolicies> topicPolicies = getTopicPolicies();
-
initializeRateLimiterIfNeeded();
updatePublishDispatcher();
@@ -2418,12 +2415,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync()));
subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> {
Dispatcher dispatcher = sub.getDispatcher();
- // If the topic-level policy already exists, the namespace-level policy cannot override
- // the topic-level policy.
- if (dispatcher != null && (!topicPolicies.isPresent() || !topicPolicies.get()
- .isSubscriptionDispatchRateSet())) {
- dispatcher.getRateLimiter()
- .ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
+ if (dispatcher != null) {
+ dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
}
}));
});