You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/06/02 03:01:30 UTC
[pulsar] branch master updated: Add cli cmd for subscription level dispatch-rate-limiter (#15862)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b1003d1281d Add cli cmd for subscription level dispatch-rate-limiter (#15862)
b1003d1281d is described below
commit b1003d1281d690e3da37d44718d4f335f4cbd23f
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Thu Jun 2 11:01:21 2022 +0800
Add cli cmd for subscription level dispatch-rate-limiter (#15862)
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 26 +++++++++++++
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 43 +++++++++++++++++-----
2 files changed, 60 insertions(+), 9 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index dd48ff0f317..2b46d2ab760 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -995,6 +995,19 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+ cmdTopics = new CmdTopicPolicies(() -> admin);
+ cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 3"));
+ verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(-1)
+ .ratePeriodInSecond(3)
+ .build());
+ cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub"));
+ verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false);
+ cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub"));
+ verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub");
+
cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
@@ -1293,6 +1306,19 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+ cmdTopics = new CmdTopicPolicies(() -> admin);
+ cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -md -1 -bd -1 -dt 2 -g"));
+ verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(-1)
+ .ratePeriodInSecond(2)
+ .build());
+ cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g"));
+ verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", "sub",false);
+ cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -s sub -g"));
+ verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1","sub");
+
cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 1314f649c86..8a4821df152 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -28,6 +28,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.TopicPolicies;
@@ -1461,10 +1462,18 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, broker returned global topic policies")
private boolean isGlobal = false;
+ @Parameter(names = {"--subscription", "-s"},
+ description = "Get message-dispatch-rate of a specific subscription")
+ private String subName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
+ if (StringUtils.isBlank(subName)) {
+ print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
+ } else {
+ print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, subName, applied));
+ }
}
}
@@ -1495,16 +1504,24 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
+ @Parameter(names = {"--subscription", "-s"},
+ description = "Set message-dispatch-rate for a specific subscription")
+ private String subName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
- DispatchRate.builder()
- .dispatchThrottlingRateInMsg(msgDispatchRate)
- .dispatchThrottlingRateInByte(byteDispatchRate)
- .ratePeriodInSecond(dispatchRatePeriodSec)
- .relativeToPublishRate(relativeToPublishRate)
- .build());
+ DispatchRate rate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(msgDispatchRate)
+ .dispatchThrottlingRateInByte(byteDispatchRate)
+ .ratePeriodInSecond(dispatchRatePeriodSec)
+ .relativeToPublishRate(relativeToPublishRate)
+ .build();
+ if (StringUtils.isBlank(subName)) {
+ getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, rate);
+ } else {
+ getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic, subName, rate);
+ }
}
}
@@ -1517,10 +1534,18 @@ public class CmdTopicPolicies extends CmdBase {
+ "If set to true, the policy will be replicate to other clusters asynchronously")
private boolean isGlobal = false;
+ @Parameter(names = {"--subscription", "-s"},
+ description = "Remove message-dispatch-rate for a specific subscription")
+ private String subName;
+
@Override
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(params);
- getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+ if (StringUtils.isBlank(subName)) {
+ getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+ } else {
+ getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic, subName);
+ }
}
}