You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/09 13:49:09 UTC

[pulsar] branch master updated: [clean][broker]Cleanup DispatchRateLimiter#onPoliciesUpdate #15986

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5cbaddcccb7 [clean][broker]Cleanup DispatchRateLimiter#onPoliciesUpdate #15986
5cbaddcccb7 is described below

commit 5cbaddcccb7183bedb3e98895c4fb34e84415f9a
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Thu Jun 9 21:48:59 2022 +0800

    [clean][broker]Cleanup DispatchRateLimiter#onPoliciesUpdate #15986
---
 .../service/persistent/DispatchRateLimiter.java    | 37 ----------------------
 .../client/api/MessageDispatchThrottlingTest.java  | 24 ++++++++------
 2 files changed, 14 insertions(+), 47 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index e8c14c99685..481a97ee15b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -276,43 +276,6 @@ public class DispatchRateLimiter {
         return true;
     }
 
-    @SuppressWarnings("deprecation")
-    public void onPoliciesUpdate(Policies data) {
-        String cluster = brokerService.pulsar().getConfiguration().getClusterName();
-
-        DispatchRate dispatchRate;
-
-        switch (type) {
-            case TOPIC:
-                dispatchRate = data.topicDispatchRate.get(cluster);
-                if (dispatchRate == null) {
-                    dispatchRate = data.clusterDispatchRate.get(cluster);
-                }
-                break;
-            case SUBSCRIPTION:
-                dispatchRate = data.subscriptionDispatchRate.get(cluster);
-                break;
-            case REPLICATOR:
-                dispatchRate = data.replicatorDispatchRate.get(cluster);
-                break;
-            default:
-                log.error("error DispatchRateLimiter type: {} ", type);
-                dispatchRate = null;
-        }
-
-        // update dispatch-rate only if it's configured in policies else ignore
-        if (dispatchRate != null) {
-            final DispatchRate newDispatchRate = createDispatchRate();
-
-            // if policy-throttling rate is disabled and cluster-throttling is enabled then apply
-            // cluster-throttling rate
-            if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
-                dispatchRate = newDispatchRate;
-            }
-            updateDispatchRate(dispatchRate);
-        }
-    }
-
     @SuppressWarnings("deprecation")
     public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
                                                            Optional<Policies> policies,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
index 858291e1c7a..abf24420d9b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java
@@ -1070,7 +1070,7 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
         admin.namespaces().createNamespace(namespace, Sets.newHashSet(cluster));
         Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
-        DispatchRateLimiter dispatchRateLimiter = new DispatchRateLimiter(topic, DispatchRateLimiter.Type.TOPIC);
+        Optional<DispatchRateLimiter> dispatchRateLimiter;
 
         Policies policies = new Policies();
         DispatchRateImpl clusterDispatchRate = DispatchRateImpl.builder()
@@ -1085,21 +1085,25 @@ public class MessageDispatchThrottlingTest extends ProducerConsumerBase {
                 .build();
 
         // (1) If both clusterDispatchRate and topicDispatchRate are empty, dispatch throttling is disabled
-        dispatchRateLimiter.onPoliciesUpdate(policies);
-        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), -1);
-        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), -1);
+        topic.onPoliciesUpdate(policies).get();
+        dispatchRateLimiter = topic.getDispatchRateLimiter();
+        Assert.assertFalse(dispatchRateLimiter.isPresent());
 
         // (2) If topicDispatchRate is empty, clusterDispatchRate is effective
         policies.clusterDispatchRate.put(cluster, clusterDispatchRate);
-        dispatchRateLimiter.onPoliciesUpdate(policies);
-        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 100);
-        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 512);
+        topic.onPoliciesUpdate(policies).get();
+        dispatchRateLimiter = topic.getDispatchRateLimiter();
+        Assert.assertTrue(dispatchRateLimiter.isPresent());
+        Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnMsg(), 100);
+        Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnByte(), 512);
 
         // (3) If topicDispatchRate is not empty, topicDispatchRate is effective
         policies.topicDispatchRate.put(cluster, topicDispatchRate);
-        dispatchRateLimiter.onPoliciesUpdate(policies);
-        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnMsg(), 200);
-        Assert.assertEquals(dispatchRateLimiter.getDispatchRateOnByte(), 1024);
+        topic.onPoliciesUpdate(policies).get();
+        dispatchRateLimiter = topic.getDispatchRateLimiter();
+        Assert.assertTrue(dispatchRateLimiter.isPresent());
+        Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnMsg(), 200);
+        Assert.assertEquals(dispatchRateLimiter.get().getDispatchRateOnByte(), 1024);
 
         producer.close();
         topic.close().get();