You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2023/02/02 23:40:24 UTC

[pulsar] branch master updated: [improve][cli] improve admin `set-backlog-quota` more clear (#19300)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 31fe347b39d [improve][cli] improve admin `set-backlog-quota` more clear (#19300)
31fe347b39d is described below

commit 31fe347b39dcc8ce4c1e8d69eeb04649506bd4c6
Author: labuladong <la...@foxmail.com>
AuthorDate: Fri Feb 3 07:40:17 2023 +0800

    [improve][cli] improve admin `set-backlog-quota` more clear (#19300)
---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 27 +++++++++-----
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 39 +++++++++++---------
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 43 ++++++++++++----------
 3 files changed, 63 insertions(+), 46 deletions(-)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 37b8c33e114..ccae1b11765 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -114,6 +114,7 @@ import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -437,6 +438,12 @@ public class PulsarAdminToolTest {
         namespaces.run(split("unload myprop/clust/ns1"));
         verify(mockNamespaces).unload("myprop/clust/ns1");
 
+        // message_age must have time limit, destination_storage must have size limit
+        Assert.assertFalse(namespaces.run(
+                split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -t message_age")));
+        Assert.assertFalse(namespaces.run(
+                split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10h -t destination_storage")));
+
         mockNamespaces = mock(Namespaces.class);
         when(admin.namespaces()).thenReturn(mockNamespaces);
         namespaces = new CmdNamespaces(() -> admin);
@@ -498,23 +505,21 @@ public class PulsarAdminToolTest {
         when(admin.namespaces()).thenReturn(mockNamespaces);
         namespaces = new CmdNamespaces(() -> admin);
 
-        namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -l 10K -lt 10m"));
+        namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -lt 10m -t message_age"));
         verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
                 BacklogQuota.builder()
-                        .limitSize(10 * 1024)
                         .limitTime(10 * 60)
                         .retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
                         .build(),
-                        BacklogQuota.BacklogQuotaType.destination_storage);
+                        BacklogQuota.BacklogQuotaType.message_age);
 
         mockNamespaces = mock(Namespaces.class);
         when(admin.namespaces()).thenReturn(mockNamespaces);
         namespaces = new CmdNamespaces(() -> admin);
 
-        namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age"));
+        namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10000 -t message_age"));
         verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
                 BacklogQuota.builder()
-                        .limitSize(10l * 1024 * 1024 * 1024)
                         .limitTime(10000)
                         .retentionPolicy(RetentionPolicy.producer_exception)
                         .build(),
@@ -1216,24 +1221,28 @@ public class PulsarAdminToolTest {
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10h"));
         verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10 * 60 * 60);
-        cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction"));
+        cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction -t message_age"));
         verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
                 BacklogQuota.builder()
-                        .limitSize(-1)
                         .limitTime(60 * 60 * 24 * 7)
                         .retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
                         .build(),
-                BacklogQuota.BacklogQuotaType.destination_storage);
+                BacklogQuota.BacklogQuotaType.message_age);
         //cmd with option cannot be executed repeatedly.
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age"));
         verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
                 BacklogQuota.builder()
-                        .limitSize(-1)
                         .limitTime(1000)
                         .retentionPolicy(RetentionPolicy.producer_request_hold)
                         .build(),
                 BacklogQuota.BacklogQuotaType.message_age);
+        //cmd with option cannot be executed repeatedly.
+        cmdTopics = new CmdTopicPolicies(() -> admin);
+        Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 1000 -p producer_request_hold -t message_age")));
+        cmdTopics = new CmdTopicPolicies(() -> admin);
+        Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 60 -p producer_request_hold -t destination_storage")));
+
         //cmd with option cannot be executed repeatedly.
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index c92f1f8838c..998591f8177 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1266,7 +1266,7 @@ public class CmdNamespaces extends CmdBase {
         @Parameter(description = "tenant/namespace", required = true)
         private java.util.List<String> params;
 
-        @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
+        @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
         private String limitStr;
 
         @Parameter(names = { "-lt", "--limitTime" },
@@ -1280,8 +1280,8 @@ public class CmdNamespaces extends CmdBase {
         private String policyStr;
 
         @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
-                + "destination_storage and message_age. "
-                + "destination_storage limits backlog by size (in bytes). "
+                + "destination_storage (default) and message_age. "
+                + "destination_storage limits backlog by size. "
                 + "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
                 + "You can set size or time to control the backlog, or combine them together to control the backlog. ")
         private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
@@ -1289,7 +1289,6 @@ public class CmdNamespaces extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             BacklogQuota.RetentionPolicy policy;
-            long limit = validateSizeString(limitStr);
             BacklogQuota.BacklogQuotaType backlogQuotaType;
 
             try {
@@ -1306,26 +1305,30 @@ public class CmdNamespaces extends CmdBase {
                         backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
             }
 
-            long limitTimeInSec = -1;
-            if (limitTimeStr != null) {
+            String namespace = validateNamespace(params);
+
+            BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);
+            if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
+                // set quota by storage size
+                if (limitStr == null) {
+                    throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
+                }
+                long limit = validateSizeString(limitStr);
+                builder.limitSize(limit);
+            } else {
+                // set quota by time
+                if (limitTimeStr == null) {
+                    throw new ParameterException("Quota type of 'message_age' needs a time limit");
+                }
+                long limitTimeInSec;
                 try {
                     limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
                 } catch (IllegalArgumentException e) {
                     throw new ParameterException(e.getMessage());
                 }
+                builder.limitTime((int) limitTimeInSec);
             }
-            if (limitTimeInSec > Integer.MAX_VALUE) {
-                throw new ParameterException(
-                        String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
-            }
-
-            String namespace = validateNamespace(params);
-            getAdmin().namespaces().setBacklogQuota(namespace,
-                    BacklogQuota.builder().limitSize(limit)
-                            .limitTime((int) limitTimeInSec)
-                            .retentionPolicy(policy)
-                            .build(),
-                    backlogQuotaType);
+            getAdmin().namespaces().setBacklogQuota(namespace, builder.build(), backlogQuotaType);
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 7cd3b49796f..d567d0b3671 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -964,7 +964,7 @@ public class CmdTopicPolicies extends CmdBase {
         private java.util.List<String> params;
 
         @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
-        private String limitStr = "-1";
+        private String limitStr = null;
 
         @Parameter(names = { "-lt", "--limitTime" },
                 description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
@@ -977,8 +977,8 @@ public class CmdTopicPolicies extends CmdBase {
         private String policyStr;
 
         @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
-                + "destination_storage and message_age. "
-                + "destination_storage limits backlog by size (in bytes). "
+                + "destination_storage (default) and message_age. "
+                + "destination_storage limits backlog by size. "
                 + "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
                 + "You can set size or time to control the backlog, or combine them together to control the backlog. ")
         private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
@@ -990,7 +990,6 @@ public class CmdTopicPolicies extends CmdBase {
         @Override
         void run() throws PulsarAdminException {
             BacklogQuota.RetentionPolicy policy;
-            long limit;
             BacklogQuota.BacklogQuotaType backlogQuotaType;
 
             try {
@@ -999,35 +998,41 @@ public class CmdTopicPolicies extends CmdBase {
                 throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
                         policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
             }
-
-            limit = validateSizeString(limitStr);
-
             try {
                 backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
             } catch (IllegalArgumentException e) {
                 throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
                         backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
             }
+            String persistentTopic = validatePersistentTopic(params);
+            BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);
 
-            long limitTimeInSec = -1;
-            if (limitTimeStr != null) {
+            if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
+                // set quota by storage size
+                if (limitStr == null) {
+                    throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
+                }
+                long limit = validateSizeString(limitStr);
+                builder.limitSize((int) limit);
+            } else {
+                // set quota by time
+                if (limitTimeStr == null) {
+                    throw new ParameterException("Quota type of 'message_age' needs a time limit");
+                }
+                long limitTimeInSec;
                 try {
                     limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
                 } catch (IllegalArgumentException e) {
                     throw new ParameterException(e.getMessage());
                 }
+                if (limitTimeInSec > Integer.MAX_VALUE) {
+                    throw new ParameterException(
+                            String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
+                }
+                builder.limitTime((int) limitTimeInSec);
             }
-            if (limitTimeInSec > Integer.MAX_VALUE) {
-                throw new ParameterException(
-                        String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
-            }
-
-            String persistentTopic = validatePersistentTopic(params);
             getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic,
-                    BacklogQuota.builder().limitSize(limit)
-                            .limitTime((int) limitTimeInSec)
-                            .retentionPolicy(policy)
-                            .build(),
+                    builder.build(),
                     backlogQuotaType);
         }
     }