You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/03 08:37:46 UTC

[pulsar] branch master updated: Fix PR #7818 #7802 does not support admin cli (#7940)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8377396  Fix PR #7818 #7802 does not support admin cli (#7940)
8377396 is described below

commit 83773962dda57f43ce46aa3ac625eb0860d0599d
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Thu Sep 3 16:37:36 2020 +0800

    Fix PR #7818 #7802 does not support admin cli (#7940)
    
    ### Motivation
    PR #7818 #7802 supports topic-level policies.
    But the pulsar admin cli java doc is not supported accordingly.
---
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 14 ++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 84 ++++++++++++++++++++++
 2 files changed, 98 insertions(+)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a46ee7c..c684329 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -726,6 +726,20 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("peek-messages persistent://myprop/clust/ns1/ds1 -s sub1 -n 3"));
         verify(mockTopics).peekMessages("persistent://myprop/clust/ns1/ds1", "sub1", 3);
 
+        cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
+        verify(mockTopics).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
+
+        cmdTopics.run(split("get-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("remove-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-unacked-messages-on-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
+        verify(mockTopics).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
+
         // argument matcher for the timestamp in reset cursor. Since we can't verify exact timestamp, we check for a
         // range of +/- 1 second of the expected timestamp
         class TimestampMatcher implements ArgumentMatcher<Long> {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index df0d342..6d18fea 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -127,6 +127,12 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
         jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
         jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
+        jcommander.addCommand("get-max-unacked-messages-on-consumer", new GetMaxUnackedMessagesOnConsumer());
+        jcommander.addCommand("set-max-unacked-messages-on-consumer", new SetMaxUnackedMessagesOnConsumer());
+        jcommander.addCommand("remove-max-unacked-messages-on-consumer", new RemoveMaxUnackedMessagesOnConsumer());
+        jcommander.addCommand("get-max-unacked-messages-on-subscription", new GetMaxUnackedMessagesOnSubscription());
+        jcommander.addCommand("set-max-unacked-messages-on-subscription", new SetMaxUnackedMessagesOnSubscription());
+        jcommander.addCommand("remove-max-unacked-messages-on-subscription", new RemoveMaxUnackedMessagesOnSubscription());
         jcommander.addCommand("get-publish-rate", new GetPublishRate());
         jcommander.addCommand("set-publish-rate", new SetPublishRate());
         jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
@@ -1170,6 +1176,84 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get max unacked messages policy on consumer for a topic")
+    private class GetMaxUnackedMessagesOnConsumer extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMaxUnackedMessagesOnConsumer(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max unacked messages policy on consumer for a topic")
+    private class RemoveMaxUnackedMessagesOnConsumer extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMaxUnackedMessagesOnConsumer(persistentTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Set max unacked messages policy on consumer for a topic")
+    private class SetMaxUnackedMessagesOnConsumer extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
+        private int maxNum;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
+        }
+    }
+
+    @Parameters(commandDescription = "Get max unacked messages policy on subscription for a topic")
+    private class GetMaxUnackedMessagesOnSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(admin.topics().getMaxUnackedMessagesOnSubscription(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max unacked messages policy on subscription for a topic")
+    private class RemoveMaxUnackedMessagesOnSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().removeMaxUnackedMessagesOnSubscription(persistentTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
+    private class SetMaxUnackedMessagesOnSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
+        private int maxNum;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            admin.topics().setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
+        }
+    }
+
     @Parameters(commandDescription = "Get compaction threshold for a topic")
     private class GetCompactionThreshold extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)