You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/03 08:37:46 UTC
[pulsar] branch master updated: Fix PR #7818 #7802 does not support
admin cli (#7940)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8377396 Fix PR #7818 #7802 does not support admin cli (#7940)
8377396 is described below
commit 83773962dda57f43ce46aa3ac625eb0860d0599d
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Sep 3 16:37:36 2020 +0800
Fix PR #7818 #7802 does not support admin cli (#7940)
### Motivation
PR #7818 #7802 supports topic-level policies.
But the pulsar admin cli java doc is not supported accordingly.
---
.../pulsar/admin/cli/PulsarAdminToolTest.java | 14 ++++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 84 ++++++++++++++++++++++
2 files changed, 98 insertions(+)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a46ee7c..c684329 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -726,6 +726,20 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
+ cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
+ verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
+
+ cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
+ verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
+
// argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
// range of +/- 1 second of the expected timestamp
class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index df0d342..6d18fea 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -127,6 +127,12 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
+ jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer());
+ jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer());
+ jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer());
+ jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription());
+ jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription());
+ jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription());
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
@@ -1170,6 +1176,84 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic")
+ private class GetMaxUnackedMessagesOnConsumer extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getMaxUnackedMessagesOnConsumer(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic")
+ private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeMaxUnackedMessagesOnConsumer(persistentTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic")
+ private class SetMaxUnackedMessagesOnConsumer extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
+ private int maxNum;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
+ }
+ }
+
+ @Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic")
+ private class GetMaxUnackedMessagesOnSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(admin.topics().getMaxUnackedMessagesOnSubscription(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic")
+ private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().removeMaxUnackedMessagesOnSubscription(persistentTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
+ private class SetMaxUnackedMessagesOnSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
+ private int maxNum;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ admin.topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
+ }
+ }
+
@Parameters(commandDescription = "Get compaction threshold for a topic")
private class GetCompactionThreshold extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)