You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/28 03:57:36 UTC

[pulsar] branch master updated: Add topics CLI support to get partitioned internal stats. (#6624)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aa66b12  Add topics CLI support to get partitioned internal stats. (#6624)
aa66b12 is described below

commit aa66b12cc759c23c9fdb2851580273f77c281154
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Sat Mar 28 11:57:24 2020 +0800

    Add topics CLI support to get partitioned internal stats. (#6624)
    
    ### Motivation
    
    Currently `Topics.getPartitionedInternalStats` only exposed to `CmdPersistentTopics` that has been deprecated and hidden in `CmdTopics`, users can not use this method through `topics` CLI.
    
    ### Modifications
    
    Add `topics` CLI support to get partitioned internal stats and fix some missing subcommands in doc.
---
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 41 ++++++++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  6 +++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 14 ++++++
 site2/docs/reference-pulsar-admin.md               | 54 ++++++++++++++++------
 4 files changed, 101 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 2b89e96..4b9d058 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -112,6 +112,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -998,6 +999,46 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(gson.toJson(partitionTopicInfo), expectedResult);
     }
 
+    @Test
+    public void testGetPartitionedStatsInternal() throws Exception {
+        String partitionedTopic = "my-topic";
+        String subName = "my-sub";
+        assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList());
+        final String partitionedTopicName = "persistent://prop-xyz/ns1/" + partitionedTopic;
+        admin.topics().createPartitionedTopic(partitionedTopicName, 2);
+        assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList(partitionedTopicName));
+        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
+
+        // create consumer and subscription
+        pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName(subName).subscribe();
+
+        // publish several messages
+        publishMessagesOnPersistentTopic(partitionedTopicName, 10);
+
+        String partitionTopic0 = partitionedTopicName + "-partition-0";
+        String partitionTopic1 = partitionedTopicName + "-partition-1";
+
+        PersistentTopicInternalStats internalStats0 = admin.topics().getInternalStats(partitionTopic0);
+        assertEquals(internalStats0.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
+
+        PersistentTopicInternalStats internalStats1 = admin.topics().getInternalStats(partitionTopic1);
+        assertEquals(internalStats1.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
+
+        // expected internal stats
+        PartitionedTopicMetadata partitionedTopicMetadata = new PartitionedTopicMetadata(2);
+        PartitionedTopicInternalStats expectedInternalStats = new PartitionedTopicInternalStats(partitionedTopicMetadata);
+        expectedInternalStats.partitions.put(partitionTopic0, internalStats0);
+        expectedInternalStats.partitions.put(partitionTopic1, internalStats1);
+
+        // partitioned internal stats
+        PartitionedTopicInternalStats partitionedInternalStats = admin.topics().getPartitionedInternalStats(partitionedTopicName);
+
+        String expectedResult = ObjectMapperFactory.getThreadLocal().writeValueAsString(expectedInternalStats);
+        String result = ObjectMapperFactory.getThreadLocal().writeValueAsString(partitionedInternalStats);
+
+        assertEquals(result, expectedResult);
+    }
+
     @Test(dataProvider = "numBundles")
     public void testDeleteNamespaceBundle(Integer numBundles) throws Exception {
         admin.namespaces().deleteNamespace("prop-xyz/ns1");
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index c67f60a..0cea786 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -662,6 +662,9 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
         verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false);
 
+        cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
         verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
 
@@ -748,6 +751,9 @@ public class PulsarAdminToolTest {
         topics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
         verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true);
 
+        topics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
+
         topics.run(split("skip-all persistent://myprop/clust/ns1/ds1 -s sub1"));
         verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 079405c..84cbf60 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -74,6 +74,7 @@ public class CmdTopics extends CmdBase {
         jcommander.addCommand("info-internal", new GetInternalInfo());
 
         jcommander.addCommand("partitioned-stats", new GetPartitionedStats());
+        jcommander.addCommand("partitioned-stats-internal", new GetPartitionedStatsInternal());
 
         jcommander.addCommand("skip", new Skip());
         jcommander.addCommand("clear-backlog", new ClearBacklog());
@@ -431,6 +432,19 @@ public class CmdTopics extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the internal stats for the partitioned topic and its connected producers and consumers. \n"
+            + "\t       All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
+    private class GetPartitionedStatsInternal extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws Exception {
+            String topic = validateTopicName(params);
+            print(topics.getPartitionedInternalStats(topic));
+        }
+    }
+
     @Parameters(commandDescription = "Skip all the messages for the subscription")
     private class ClearBacklog extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index fe258e2..b74c46c 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1692,8 +1692,8 @@ Subcommands
 * `create`
 * `get-partitioned-topic-metadata`
 * `update-partitioned-topic`
+* `list-partitioned-topics`
 * `list`
-* `list-in-bundle`
 * `terminate`
 * `permissions`
 * `grant-permission`
@@ -1702,18 +1702,21 @@ Subcommands
 * `bundle-range`
 * `delete`
 * `unload`
+* `create-subscription`
 * `subscriptions`
 * `unsubscribe`
 * `stats`
 * `stats-internal`
 * `info-internal`
 * `partitioned-stats`
+* `partitioned-stats-internal`
 * `skip`
 * `clear-backlog`
 * `expire-messages`
 * `expire-messages-all-subscriptions`
 * `peek-messages`
 * `reset-cursor`
+* `last-message-id`
 
 
 ### `compact`
@@ -1845,28 +1848,22 @@ Options
 |---|---|---|
 |`-p`, `--partitions`|The number of partitions for the topic|0|
 
-### `list`
-Get the list of topics under a namespace
+### `list-partitioned-topics`
+Get the list of partitioned topics under a namespace.
 
 Usage
-```
-$ pulsar-admin topics list tenant/cluster/namespace
+```bash
+$ pulsar-admin topics list-partitioned-topics tenant/namespace
 ```
 
-### `list-in-bundle`
-Get a list of non-persistent topics present under a namespace bundle
+### `list`
+Get the list of topics under a namespace
 
 Usage
 ```
-$ pulsar-admin topics list-in-bundle tenant/namespace options
+$ pulsar-admin topics list tenant/cluster/namespace
 ```
 
-Options
-|Flag|Description|Default|
-|---|---|---|
-|`-b`, `--bundle`|The bundle range||
-
-
 ### `terminate`
 Terminate a topic (disallow further messages from being published on the topic)
 
@@ -1938,6 +1935,20 @@ Usage
 $ pulsar-admin topics unload topic
 ```
 
+### `create-subscription`
+Create a new subscription on a topic.
+
+Usage
+```bash
+$ pulsar-admin topics create-subscription [options] persistent://tenant/namespace/topic
+```
+
+Options
+|Flag|Description|Default|
+|---|---|---|
+|`-m`, `--messageId`|messageId where to create the subscription. It can be either 'latest', 'earliest' or (ledgerId:entryId)|latest|
+|`-s`, `--subscription`|Subscription to reset position on||
+
 ### `subscriptions`
 Get the list of subscriptions on the topic
 
@@ -2001,6 +2012,14 @@ Options
 |---|---|---|
 |`--per-partition`|Get per-partition stats|false|
 
+### `partitioned-stats-internal`
+Get the internal stats for the partitioned topic and its connected producers and consumers. All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.
+
+Usage
+```bash
+$ pulsar-admin topics partitioned-stats-internal topic
+```
+
 
 ### `skip`
 Skip some messages for the subscription
@@ -2091,6 +2110,13 @@ Options
 |`-t`, `--time`|The time in minutes to reset back to (or minutes, hours, days, weeks, etc.). Examples: `100m`, `3h`, `2d`, `5w`.||
 |`-m`, `--messageId`| The messageId to reset back to (ledgerId:entryId). ||
 
+### `last-message-id`
+Get the last commit message id of topic.
+
+Usage
+```bash
+$ pulsar-admin topics last-message-id persistent://tenant/namespace/topic
+```
 
 
 ## `tenants`