You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/02 03:01:30 UTC

[pulsar] branch master updated: Add cli cmd for subscription level dispatch-rate-limiter (#15862)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b1003d1281d Add cli cmd for subscription level dispatch-rate-limiter (#15862)
b1003d1281d is described below

commit b1003d1281d690e3da37d44718d4f335f4cbd23f
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Jun 2 11:01:21 2022 +0800

    Add cli cmd for subscription level dispatch-rate-limiter (#15862)
---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 26 +++++++++++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 43 +++++++++++++++++-----
 2 files changed, 60 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index dd48ff0f317..2b46d2ab760 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -995,6 +995,19 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics = new CmdTopicPolicies(() -> admin);
+        cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 3"));
+        verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",
+                DispatchRate.builder()
+                        .dispatchThrottlingRateInMsg(-1)
+                        .dispatchThrottlingRateInByte(-1)
+                        .ratePeriodInSecond(3)
+                        .build());
+        cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub"));
+        verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false);
+        cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub"));
+        verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub");
+
         cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
@@ -1293,6 +1306,19 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics = new CmdTopicPolicies(() -> admin);
+        cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 2 -g"));
+        verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",
+                DispatchRate.builder()
+                        .dispatchThrottlingRateInMsg(-1)
+                        .dispatchThrottlingRateInByte(-1)
+                        .ratePeriodInSecond(2)
+                        .build());
+        cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g"));
+        verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false);
+        cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g"));
+        verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1","sub");
+
         cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 1314f649c86..8a4821df152 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.TopicPolicies;
@@ -1461,10 +1462,18 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, broker returned global topic policies")
         private boolean isGlobal = false;
 
+        @Parameter(names = {"--subscription", "-s"},
+                description = "Get message-dispatch-rate of a specific subscription")
+        private String subName;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
+            if (StringUtils.isBlank(subName)) {
+                print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
+            } else {
+                print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, subName, applied));
+            }
         }
     }
 
@@ -1495,16 +1504,24 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, the policy will be replicate to other clusters asynchronously")
         private boolean isGlobal = false;
 
+        @Parameter(names = {"--subscription", "-s"},
+                description = "Set message-dispatch-rate for a specific subscription")
+        private String subName;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
-                    DispatchRate.builder()
-                            .dispatchThrottlingRateInMsg(msgDispatchRate)
-                            .dispatchThrottlingRateInByte(byteDispatchRate)
-                            .ratePeriodInSecond(dispatchRatePeriodSec)
-                            .relativeToPublishRate(relativeToPublishRate)
-                            .build());
+            DispatchRate rate = DispatchRate.builder()
+                    .dispatchThrottlingRateInMsg(msgDispatchRate)
+                    .dispatchThrottlingRateInByte(byteDispatchRate)
+                    .ratePeriodInSecond(dispatchRatePeriodSec)
+                    .relativeToPublishRate(relativeToPublishRate)
+                    .build();
+            if (StringUtils.isBlank(subName)) {
+                getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, rate);
+            } else {
+                getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, subName, rate);
+            }
         }
     }
 
@@ -1517,10 +1534,18 @@ public class CmdTopicPolicies extends CmdBase {
                 + "If set to true, the policy will be replicate to other clusters asynchronously")
         private boolean isGlobal = false;
 
+        @Parameter(names = {"--subscription", "-s"},
+                description = "Remove message-dispatch-rate for a specific subscription")
+        private String subName;
+
         @Override
         void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
-            getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+            if (StringUtils.isBlank(subName)) {
+                getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+            } else {
+                getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic, subName);
+            }
         }
 
     }