You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/03/28 03:57:36 UTC
[pulsar] branch master updated: Add topics CLI support to get
partitioned internal stats. (#6624)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aa66b12 Add topics CLI support to get partitioned internal stats. (#6624)
aa66b12 is described below
commit aa66b12cc759c23c9fdb2851580273f77c281154
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Sat Mar 28 11:57:24 2020 +0800
Add topics CLI support to get partitioned internal stats. (#6624)
### Motivation
Currently `Topics.getPartitionedInternalStats` only exposed to `CmdPersistentTopics` that has been deprecated and hidden in `CmdTopics`, users can not use this method through `topics` CLI.
### Modifications
Add `topics` CLI support to get partitioned internal stats and fix some missing subcommands in doc.
---
.../apache/pulsar/broker/admin/AdminApiTest.java | 41 ++++++++++++++++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 6 +++
.../org/apache/pulsar/admin/cli/CmdTopics.java | 14 ++++++
site2/docs/reference-pulsar-admin.md | 54 ++++++++++++++++------
4 files changed, 101 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 2b89e96..4b9d058 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -112,6 +112,7 @@ import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -998,6 +999,46 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
assertEquals(gson.toJson(partitionTopicInfo), expectedResult);
}
+ @Test
+ public void testGetPartitionedStatsInternal() throws Exception {
+ String partitionedTopic = "my-topic";
+ String subName = "my-sub";
+ assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList());
+ final String partitionedTopicName = "persistent://prop-xyz/ns1/" + partitionedTopic;
+ admin.topics().createPartitionedTopic(partitionedTopicName, 2);
+ assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList(partitionedTopicName));
+ assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
+
+ // create consumer and subscription
+ pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName(subName).subscribe();
+
+ // publish several messages
+ publishMessagesOnPersistentTopic(partitionedTopicName, 10);
+
+ String partitionTopic0 = partitionedTopicName + "-partition-0";
+ String partitionTopic1 = partitionedTopicName + "-partition-1";
+
+ PersistentTopicInternalStats internalStats0 = admin.topics().getInternalStats(partitionTopic0);
+ assertEquals(internalStats0.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
+
+ PersistentTopicInternalStats internalStats1 = admin.topics().getInternalStats(partitionTopic1);
+ assertEquals(internalStats1.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList(Codec.encode(subName))));
+
+ // expected internal stats
+ PartitionedTopicMetadata partitionedTopicMetadata = new PartitionedTopicMetadata(2);
+ PartitionedTopicInternalStats expectedInternalStats = new PartitionedTopicInternalStats(partitionedTopicMetadata);
+ expectedInternalStats.partitions.put(partitionTopic0, internalStats0);
+ expectedInternalStats.partitions.put(partitionTopic1, internalStats1);
+
+ // partitioned internal stats
+ PartitionedTopicInternalStats partitionedInternalStats = admin.topics().getPartitionedInternalStats(partitionedTopicName);
+
+ String expectedResult = ObjectMapperFactory.getThreadLocal().writeValueAsString(expectedInternalStats);
+ String result = ObjectMapperFactory.getThreadLocal().writeValueAsString(partitionedInternalStats);
+
+ assertEquals(result, expectedResult);
+ }
+
@Test(dataProvider = "numBundles")
public void testDeleteNamespaceBundle(Integer numBundles) throws Exception {
admin.namespaces().deleteNamespace("prop-xyz/ns1");
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index c67f60a..0cea786 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -662,6 +662,9 @@ public class PulsarAdminToolTest {
cmdTopics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false);
+ cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("clear-backlog persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
@@ -748,6 +751,9 @@ public class PulsarAdminToolTest {
topics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true);
+ topics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
+
topics.run(split("skip-all persistent://myprop/clust/ns1/ds1 -s sub1"));
verify(mockTopics).skipAllMessages("persistent://myprop/clust/ns1/ds1", "sub1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 079405c..84cbf60 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -74,6 +74,7 @@ public class CmdTopics extends CmdBase {
jcommander.addCommand("info-internal", new GetInternalInfo());
jcommander.addCommand("partitioned-stats", new GetPartitionedStats());
+ jcommander.addCommand("partitioned-stats-internal", new GetPartitionedStatsInternal());
jcommander.addCommand("skip", new Skip());
jcommander.addCommand("clear-backlog", new ClearBacklog());
@@ -431,6 +432,19 @@ public class CmdTopics extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the internal stats for the partitioned topic and its connected producers and consumers. \n"
+ + "\t All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.")
+ private class GetPartitionedStatsInternal extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic\n", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws Exception {
+ String topic = validateTopicName(params);
+ print(topics.getPartitionedInternalStats(topic));
+ }
+ }
+
@Parameters(commandDescription = "Skip all the messages for the subscription")
private class ClearBacklog extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md
index fe258e2..b74c46c 100644
--- a/site2/docs/reference-pulsar-admin.md
+++ b/site2/docs/reference-pulsar-admin.md
@@ -1692,8 +1692,8 @@ Subcommands
* `create`
* `get-partitioned-topic-metadata`
* `update-partitioned-topic`
+* `list-partitioned-topics`
* `list`
-* `list-in-bundle`
* `terminate`
* `permissions`
* `grant-permission`
@@ -1702,18 +1702,21 @@ Subcommands
* `bundle-range`
* `delete`
* `unload`
+* `create-subscription`
* `subscriptions`
* `unsubscribe`
* `stats`
* `stats-internal`
* `info-internal`
* `partitioned-stats`
+* `partitioned-stats-internal`
* `skip`
* `clear-backlog`
* `expire-messages`
* `expire-messages-all-subscriptions`
* `peek-messages`
* `reset-cursor`
+* `last-message-id`
### `compact`
@@ -1845,28 +1848,22 @@ Options
|---|---|---|
|`-p`, `--partitions`|The number of partitions for the topic|0|
-### `list`
-Get the list of topics under a namespace
+### `list-partitioned-topics`
+Get the list of partitioned topics under a namespace.
Usage
-```
-$ pulsar-admin topics list tenant/cluster/namespace
+```bash
+$ pulsar-admin topics list-partitioned-topics tenant/namespace
```
-### `list-in-bundle`
-Get a list of non-persistent topics present under a namespace bundle
+### `list`
+Get the list of topics under a namespace
Usage
```
-$ pulsar-admin topics list-in-bundle tenant/namespace options
+$ pulsar-admin topics list tenant/cluster/namespace
```
-Options
-|Flag|Description|Default|
-|---|---|---|
-|`-b`, `--bundle`|The bundle range||
-
-
### `terminate`
Terminate a topic (disallow further messages from being published on the topic)
@@ -1938,6 +1935,20 @@ Usage
$ pulsar-admin topics unload topic
```
+### `create-subscription`
+Create a new subscription on a topic.
+
+Usage
+```bash
+$ pulsar-admin topics create-subscription [options] persistent://tenant/namespace/topic
+```
+
+Options
+|Flag|Description|Default|
+|---|---|---|
+|`-m`, `--messageId`|messageId where to create the subscription. It can be either 'latest', 'earliest' or (ledgerId:entryId)|latest|
+|`-s`, `--subscription`|Subscription to reset position on||
+
### `subscriptions`
Get the list of subscriptions on the topic
@@ -2001,6 +2012,14 @@ Options
|---|---|---|
|`--per-partition`|Get per-partition stats|false|
+### `partitioned-stats-internal`
+Get the internal stats for the partitioned topic and its connected producers and consumers. All the rates are computed over a 1 minute window and are relative the last completed 1 minute period.
+
+Usage
+```bash
+$ pulsar-admin topics partitioned-stats-internal topic
+```
+
### `skip`
Skip some messages for the subscription
@@ -2091,6 +2110,13 @@ Options
|`-t`, `--time`|The time in minutes to reset back to (or minutes, hours, days, weeks, etc.). Examples: `100m`, `3h`, `2d`, `5w`.||
|`-m`, `--messageId`| The messageId to reset back to (ledgerId:entryId). ||
+### `last-message-id`
+Get the last commit message id of topic.
+
+Usage
+```bash
+$ pulsar-admin topics last-message-id persistent://tenant/namespace/topic
+```
## `tenants`