You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2023/02/02 23:40:24 UTC
[pulsar] branch master updated: [improve][cli] improve admin `set-backlog-quota` more clear (#19300)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 31fe347b39d [improve][cli] improve admin `set-backlog-quota` more clear (#19300)
31fe347b39d is described below
commit 31fe347b39dcc8ce4c1e8d69eeb04649506bd4c6
Author: labuladong <la...@foxmail.com>
AuthorDate: Fri Feb 3 07:40:17 2023 +0800
[improve][cli] improve admin `set-backlog-quota` more clear (#19300)
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 27 +++++++++-----
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 39 +++++++++++---------
.../apache/pulsar/admin/cli/CmdTopicPolicies.java | 43 ++++++++++++----------
3 files changed, 63 insertions(+), 46 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 37b8c33e114..ccae1b11765 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -114,6 +114,7 @@ import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
@Slf4j
@@ -437,6 +438,12 @@ public class PulsarAdminToolTest {
namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");
+ // message_age must have time limit, destination_storage must have size limit
+ Assert.assertFalse(namespaces.run(
+ split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -t message_age")));
+ Assert.assertFalse(namespaces.run(
+ split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10h -t destination_storage")));
+
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
@@ -498,23 +505,21 @@ public class PulsarAdminToolTest {
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
- namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -l 10K -lt 10m"));
+ namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p consumer_backlog_eviction -lt 10m -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
- .limitSize(10 * 1024)
.limitTime(10 * 60)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
- BacklogQuota.BacklogQuotaType.destination_storage);
+ BacklogQuota.BacklogQuotaType.message_age);
mockNamespaces = mock(Namespaces.class);
when(admin.namespaces()).thenReturn(mockNamespaces);
namespaces = new CmdNamespaces(() -> admin);
- namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age"));
+ namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -lt 10000 -t message_age"));
verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1",
BacklogQuota.builder()
- .limitSize(10l * 1024 * 1024 * 1024)
.limitTime(10000)
.retentionPolicy(RetentionPolicy.producer_exception)
.build(),
@@ -1216,24 +1221,28 @@ public class PulsarAdminToolTest {
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10h"));
verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10 * 60 * 60);
- cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction"));
+ cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1w -p consumer_backlog_eviction -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
- .limitSize(-1)
.limitTime(60 * 60 * 24 * 7)
.retentionPolicy(RetentionPolicy.consumer_backlog_eviction)
.build(),
- BacklogQuota.BacklogQuotaType.destination_storage);
+ BacklogQuota.BacklogQuotaType.message_age);
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age"));
verify(mockTopicsPolicies).setBacklogQuota("persistent://myprop/clust/ns1/ds1",
BacklogQuota.builder()
- .limitSize(-1)
.limitTime(1000)
.retentionPolicy(RetentionPolicy.producer_request_hold)
.build(),
BacklogQuota.BacklogQuotaType.message_age);
+ //cmd with option cannot be executed repeatedly.
+ cmdTopics = new CmdTopicPolicies(() -> admin);
+ Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 1000 -p producer_request_hold -t message_age")));
+ cmdTopics = new CmdTopicPolicies(() -> admin);
+ Assert.assertFalse(cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 60 -p producer_request_hold -t destination_storage")));
+
//cmd with option cannot be executed repeatedly.
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index c92f1f8838c..998591f8177 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1266,7 +1266,7 @@ public class CmdNamespaces extends CmdBase {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;
- @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true)
+ @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
private String limitStr;
@Parameter(names = { "-lt", "--limitTime" },
@@ -1280,8 +1280,8 @@ public class CmdNamespaces extends CmdBase {
private String policyStr;
@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
- + "destination_storage and message_age. "
- + "destination_storage limits backlog by size (in bytes). "
+ + "destination_storage (default) and message_age. "
+ + "destination_storage limits backlog by size. "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
@@ -1289,7 +1289,6 @@ public class CmdNamespaces extends CmdBase {
@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
- long limit = validateSizeString(limitStr);
BacklogQuota.BacklogQuotaType backlogQuotaType;
try {
@@ -1306,26 +1305,30 @@ public class CmdNamespaces extends CmdBase {
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}
- long limitTimeInSec = -1;
- if (limitTimeStr != null) {
+ String namespace = validateNamespace(params);
+
+ BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);
+ if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
+ // set quota by storage size
+ if (limitStr == null) {
+ throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
+ }
+ long limit = validateSizeString(limitStr);
+ builder.limitSize(limit);
+ } else {
+ // set quota by time
+ if (limitTimeStr == null) {
+ throw new ParameterException("Quota type of 'message_age' needs a time limit");
+ }
+ long limitTimeInSec;
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
+ builder.limitTime((int) limitTimeInSec);
}
- if (limitTimeInSec > Integer.MAX_VALUE) {
- throw new ParameterException(
- String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
- }
-
- String namespace = validateNamespace(params);
- getAdmin().namespaces().setBacklogQuota(namespace,
- BacklogQuota.builder().limitSize(limit)
- .limitTime((int) limitTimeInSec)
- .retentionPolicy(policy)
- .build(),
- backlogQuotaType);
+ getAdmin().namespaces().setBacklogQuota(namespace, builder.build(), backlogQuotaType);
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 7cd3b49796f..d567d0b3671 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -964,7 +964,7 @@ public class CmdTopicPolicies extends CmdBase {
private java.util.List<String> params;
@Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)")
- private String limitStr = "-1";
+ private String limitStr = null;
@Parameter(names = { "-lt", "--limitTime" },
description = "Time limit in second (or minutes, hours, days, weeks eg: 100m, 3h, 2d, 5w), "
@@ -977,8 +977,8 @@ public class CmdTopicPolicies extends CmdBase {
private String policyStr;
@Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set. Valid options are: "
- + "destination_storage and message_age. "
- + "destination_storage limits backlog by size (in bytes). "
+ + "destination_storage (default) and message_age. "
+ + "destination_storage limits backlog by size. "
+ "message_age limits backlog by time, that is, message timestamp (broker or publish timestamp). "
+ "You can set size or time to control the backlog, or combine them together to control the backlog. ")
private String backlogQuotaTypeStr = BacklogQuota.BacklogQuotaType.destination_storage.name();
@@ -990,7 +990,6 @@ public class CmdTopicPolicies extends CmdBase {
@Override
void run() throws PulsarAdminException {
BacklogQuota.RetentionPolicy policy;
- long limit;
BacklogQuota.BacklogQuotaType backlogQuotaType;
try {
@@ -999,35 +998,41 @@ public class CmdTopicPolicies extends CmdBase {
throw new ParameterException(String.format("Invalid retention policy type '%s'. Valid options are: %s",
policyStr, Arrays.toString(BacklogQuota.RetentionPolicy.values())));
}
-
- limit = validateSizeString(limitStr);
-
try {
backlogQuotaType = BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaTypeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(String.format("Invalid backlog quota type '%s'. Valid options are: %s",
backlogQuotaTypeStr, Arrays.toString(BacklogQuota.BacklogQuotaType.values())));
}
+ String persistentTopic = validatePersistentTopic(params);
+ BacklogQuota.Builder builder = BacklogQuota.builder().retentionPolicy(policy);
- long limitTimeInSec = -1;
- if (limitTimeStr != null) {
+ if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage) {
+ // set quota by storage size
+ if (limitStr == null) {
+ throw new ParameterException("Quota type of 'destination_storage' needs a size limit");
+ }
+ long limit = validateSizeString(limitStr);
+ builder.limitSize((int) limit);
+ } else {
+ // set quota by time
+ if (limitTimeStr == null) {
+ throw new ParameterException("Quota type of 'message_age' needs a time limit");
+ }
+ long limitTimeInSec;
try {
limitTimeInSec = RelativeTimeUtil.parseRelativeTimeInSeconds(limitTimeStr);
} catch (IllegalArgumentException e) {
throw new ParameterException(e.getMessage());
}
+ if (limitTimeInSec > Integer.MAX_VALUE) {
+ throw new ParameterException(
+ String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
+ }
+ builder.limitTime((int) limitTimeInSec);
}
- if (limitTimeInSec > Integer.MAX_VALUE) {
- throw new ParameterException(
- String.format("Time limit cannot be greater than %d seconds", Integer.MAX_VALUE));
- }
-
- String persistentTopic = validatePersistentTopic(params);
getTopicPolicies(isGlobal).setBacklogQuota(persistentTopic,
- BacklogQuota.builder().limitSize(limit)
- .limitTime((int) limitTimeInSec)
- .retentionPolicy(policy)
- .build(),
+ builder.build(),
backlogQuotaType);
}
}