You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/09 13:49:09 UTC
[pulsar] branch master updated: [clean][broker]Cleanup DispatchRateLimiter#onPoliciesUpdate #15986
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5cbaddcccb7 [clean][broker]Cleanup DispatchRateLimiter#onPoliciesUpdate #15986
5cbaddcccb7 is described below
commit 5cbaddcccb7183bedb3e98895c4fb34e84415f9a
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Thu Jun 9 21:48:59 2022 +0800
[clean][broker]Cleanup DispatchRateLimiter#onPoliciesUpdate #15986
---
.../service/persistent/DispatchRateLimiter.java | 37 ----------------------
.../client/api/MessageDispatchThrottlingTest.java | 24 ++++++++------
2 files changed, 14 insertions(+), 47 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index e8c14c99685..481a97ee15b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -276,43 +276,6 @@ public class DispatchRateLimiter {
return true;
}
- @SuppressWarnings("deprecation")
- public void onPoliciesUpdate(Policies data) {
- String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-
- DispatchRate dispatchRate;
-
- switch (type) {
- case TOPIC:
- dispatchRate = data.topicDispatchRate.get(cluster);
- if (dispatchRate == null) {
- dispatchRate = data.clusterDispatchRate.get(cluster);
- }
- break;
- case SUBSCRIPTION:
- dispatchRate = data.subscriptionDispatchRate.get(cluster);
- break;
- case REPLICATOR:
- dispatchRate = data.replicatorDispatchRate.get(cluster);
- break;
- default:
- log.error("error DispatchRateLimiter type: {} ", type);
- dispatchRate = null;
- }
-
- // update dispatch-rate only if it's configured in policies else ignore
- if (dispatchRate != null) {
- final DispatchRate newDispatchRate = createDispatchRate();
-
- // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
- // cluster-throttling rate
- if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
- dispatchRate = newDispatchRate;
- }
- updateDispatchRate(dispatchRate);
- }
- }
-
@SuppressWarnings("deprecation")
public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
Optional<Policies> policies,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 858291e1c7a..abf24420d9b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -1070,7 +1070,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
admin.namespaces().createNamespace(namespace, Sets.newHashSet(cluster));
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
- DispatchRateLimiter dispatchRateLimiter = new DispatchRateLimiter(topic, DispatchRateLimiter.Type.TOPIC);
+ Optional<DispatchRateLimiter> dispatchRateLimiter;
Policies policies = new Policies();
DispatchRateImpl clusterDispatchRate = DispatchRateImpl.builder()
@@ -1085,21 +1085,25 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
.build();
// (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled
- dispatchRateLimiter.onPoliciesUpdate(policies);
- Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
- Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+ topic.onPoliciesUpdate(policies).get();
+ dispatchRateLimiter = topic.getDispatchRateLimiter();
+ Assert.assertFalse(dispatchRateLimiter.isPresent());
// (2) If topicDispatchRate is empty, clusterDispatchRate is effective
policies.clusterDispatchRate.put(cluster, clusterDispatchRate);
- dispatchRateLimiter.onPoliciesUpdate(policies);
- Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100);
- Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 512);
+ topic.onPoliciesUpdate(policies).get();
+ dispatchRateLimiter = topic.getDispatchRateLimiter();
+ Assert.assertTrue(dispatchRateLimiter.isPresent());
+ Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnMsg(), 100);
+ Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnByte(), 512);
// (3) If topicDispatchRate is not empty, topicDispatchRate is effective
policies.topicDispatchRate.put(cluster, topicDispatchRate);
- dispatchRateLimiter.onPoliciesUpdate(policies);
- Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 200);
- Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 1024);
+ topic.onPoliciesUpdate(policies).get();
+ dispatchRateLimiter = topic.getDispatchRateLimiter();
+ Assert.assertTrue(dispatchRateLimiter.isPresent());
+ Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnMsg(), 200);
+ Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnByte(), 1024);
producer.close();
topic.close().get();