You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/08/09 00:37:50 UTC

[pulsar] branch master updated: Fix MsgDropRate missing from NonPersistentTopics stats output. (#11119)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0aca5f9  Fix MsgDropRate missing from NonPersistentTopics stats output. (#11119)
0aca5f9 is described below

commit 0aca5f9153afc7804a3ae9b169346a06ee9811d9
Author: Marvin Cai <zx...@streamnative.io>
AuthorDate: Sun Aug 8 17:36:19 2021 -0700

    Fix MsgDropRate missing from NonPersistentTopics stats output. (#11119)
    
    Fixes #https://github.com/apache/pulsar/issues/10495
    
    ### Motivation
    MsgDropRate info is missing after NonPersistentTopics admin api merged with Topics admin api. This PR is trying to fix this.
    
    ### Modifications
    Seems due to API merging, data is not properly deserialized in admin client.
    And also due to the added TopicsStats interface, the field hiding causing weird behavior with Jackson so fields in NonPersistentTopicStatsImpl intended to hide superclass' fields are not shown in output.
    
    Fixing by not using same field name to hide superclass fields and use @JsonIgnore to hide them from output. And add new fields to store subscription/publisher/replicator info for NonPersistentTopic.
    This does change the output name of those info, but it only changed in cli output, for admin client the old getSubscriptions/getSubscriptions/getReplication will still work.
---
 .../broker/admin/impl/PersistentTopicsBase.java    |   5 +-
 .../broker/admin/v2/NonPersistentTopics.java       | 116 +++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  14 +--
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  20 ++--
 ...ava => NonPersistentPartitionedTopicStats.java} |  12 +--
 .../policies/data/PartitionedTopicStats.java       |   4 +-
 .../pulsar/client/admin/internal/TopicsImpl.java   |  94 ++++++++++++-----
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  30 +++---
 .../NonPersistentPartitionedTopicStatsImpl.java    |  63 +++++++++++
 .../data/stats/NonPersistentTopicStatsImpl.java    | 113 ++++++++++++++++----
 .../pulsar/common/util/ObjectMapperFactory.java    |   3 +
 .../NonPersistentPartitionedTopicStatsTest.java    |  61 +++++++++++
 12 files changed, 452 insertions(+), 83 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4831994..681cdb0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1263,7 +1263,8 @@ public class PersistentTopicsBase extends AdminResource {
                         try {
                             stats.add(statFuture.get());
                             if (perPartition) {
-                                stats.partitions.put(topicName.getPartition(i).toString(), statFuture.get());
+                                stats.getPartitions().put(topicName.getPartition(i).toString(),
+                                        (TopicStatsImpl) statFuture.get());
                             }
                         } catch (Exception e) {
                             asyncResponse.resume(new RestException(e));
@@ -1276,7 +1277,7 @@ public class PersistentTopicsBase extends AdminResource {
                     try {
                         boolean zkPathExists = namespaceResources().getPartitionedTopicResources().exists(path);
                         if (zkPathExists) {
-                            stats.partitions.put(topicName.toString(), new TopicStatsImpl());
+                            stats.getPartitions().put(topicName.toString(), new TopicStatsImpl());
                         } else {
                             asyncResponse.resume(
                                     new RestException(Status.NOT_FOUND,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 9755b93..2cfb20f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -46,6 +46,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
@@ -57,6 +58,9 @@ import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TopicOperation;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
@@ -202,6 +206,118 @@ public class NonPersistentTopics extends PersistentTopics {
         }
     }
 
+
+    @GET
+    @Path("{tenant}/{namespace}/{topic}/partitioned-stats")
+    @ApiOperation(value = "Get the stats for the partitioned topic.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic does not exist"),
+            @ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
+    })
+    public void getPartitionedStats(
+            @Suspended final AsyncResponse asyncResponse,
+            @ApiParam(value = "Specify the tenant", required = true)
+            @PathParam("tenant") String tenant,
+            @ApiParam(value = "Specify the namespace", required = true)
+            @PathParam("namespace") String namespace,
+            @ApiParam(value = "Specify topic name", required = true)
+            @PathParam("topic") @Encoded String encodedTopic,
+            @ApiParam(value = "Get per partition stats")
+            @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
+            @ApiParam(value = "Is authentication required to perform this operation")
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "If return precise backlog or imprecise backlog")
+            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
+            @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+                    + "not to use when there's heavy traffic.")
+            @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            if (topicName.isGlobal()) {
+                try {
+                    validateGlobalNamespaceOwnership(namespaceName);
+                } catch (Exception e) {
+                    log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return;
+                }
+            }
+            getPartitionedTopicMetadataAsync(topicName,
+                    authoritative, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+                    return;
+                }
+                NonPersistentPartitionedTopicStatsImpl stats =
+                        new NonPersistentPartitionedTopicStatsImpl(partitionMetadata);
+                List<CompletableFuture<TopicStats>> topicStatsFutureList = Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        topicStatsFutureList
+                                .add(pulsar().getAdminClient().topics().getStatsAsync(
+                                        (topicName.getPartition(i).toString()), getPreciseBacklog,
+                                        subscriptionBacklogSize));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+                }
+
+                FutureUtil.waitForAll(topicStatsFutureList).handle((result, exception) -> {
+                    CompletableFuture<TopicStats> statFuture = null;
+                    for (int i = 0; i < topicStatsFutureList.size(); i++) {
+                        statFuture = topicStatsFutureList.get(i);
+                        if (statFuture.isDone() && !statFuture.isCompletedExceptionally()) {
+                            try {
+                                stats.add((NonPersistentTopicStatsImpl) statFuture.get());
+                                if (perPartition) {
+                                    stats.getPartitions().put(topicName.getPartition(i).toString(),
+                                            (NonPersistentTopicStatsImpl) statFuture.get());
+                                }
+                            } catch (Exception e) {
+                                asyncResponse.resume(new RestException(e));
+                                return null;
+                            }
+                        }
+                    }
+                    if (perPartition && stats.partitions.isEmpty()) {
+                        String path = ZkAdminPaths.partitionedTopicPath(topicName);
+                        try {
+                            boolean zkPathExists = namespaceResources().getPartitionedTopicResources().exists(path);
+                            if (zkPathExists) {
+                                stats.getPartitions().put(topicName.toString(), new NonPersistentTopicStatsImpl());
+                            } else {
+                                asyncResponse.resume(
+                                        new RestException(Status.NOT_FOUND,
+                                                "Internal topics have not been generated yet"));
+                                return null;
+                            }
+                        } catch (Exception e) {
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    }
+                    asyncResponse.resume(stats);
+                    return null;
+                });
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get partitioned stats for {}", clientAppId(), topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+
     @PUT
     @Path("/{tenant}/{namespace}/{topic}/unload")
     @ApiOperation(value = "Unload a topic")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f342274..21ff33d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -918,12 +918,14 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             fail("getPartitionedTopicMetadata of " + anotherTopic + " should not succeed");
         } catch (NotFoundException expected) {
         }
-        // check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info
+
+        PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName,false);
+
+                // check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info
         assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
-                admin.topics().getPartitionedStats(partitionedTopicName,false).getMetadata().partitions);
+                topicStats.getMetadata().partitions);
 
-        assertEquals(admin.topics().getPartitionedStats(partitionedTopicName, false).getPartitions().size(),
-                0);
+        assertEquals(topicStats.getPartitions().size(), 0);
 
         List<String> subscriptions = admin.topics().getSubscriptions(partitionedTopicName);
         assertEquals(subscriptions.size(), 0);
@@ -985,7 +987,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
                         partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3"));
 
         // test cumulative stats for partitioned topic
-        PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
+        topicStats = admin.topics().getPartitionedStats(partitionedTopicName,false);
         if (isPersistent) {
             // TODO: for non-persistent topics, the subscription doesn't exist
             assertEquals(topicStats.getSubscriptions().keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
@@ -996,7 +998,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         assertEquals(topicStats.getPartitions(), Maps.newHashMap());
 
         // test per partition stats for partitioned topic
-        topicStats = admin.topics().getPartitionedStats(partitionedTopicName, true);
+        topicStats = admin.topics().getPartitionedStats(partitionedTopicName,true);
         assertEquals(topicStats.getMetadata().partitions, 4);
         assertEquals(topicStats.getPartitions().keySet(),
                 Sets.newHashSet(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 0fc1cb6..ce6814c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.FailureDomain;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -322,9 +323,9 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
     public void nonPersistentTopics() throws Exception {
         final String topicName = "nonPersistentTopic";
 
-        final String persistentTopicName = "non-persistent://prop-xyz/ns1/" + topicName;
+        final String nonPersistentTopicName = "non-persistent://prop-xyz/ns1/" + topicName;
         // Force to create a topic
-        publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 0, 0);
+        publishMessagesOnTopic(nonPersistentTopicName, 0, 0);
 
         // create consumer and subscription
         @Cleanup
@@ -332,28 +333,27 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
                 .serviceUrl(pulsar.getWebServiceAddress())
                 .statsInterval(0, TimeUnit.SECONDS)
                 .build();
-        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName("my-sub")
+        Consumer<byte[]> consumer = client.newConsumer().topic(nonPersistentTopicName).subscriptionName("my-sub")
                 .subscribe();
 
-        publishMessagesOnTopic("non-persistent://prop-xyz/ns1/" + topicName, 10, 0);
+        publishMessagesOnTopic(nonPersistentTopicName, 10, 0);
 
-        TopicStats topicStats = admin.topics().getStats(persistentTopicName);
+        NonPersistentTopicStats topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
         assertEquals(topicStats.getSubscriptions().keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
         assertEquals(topicStats.getSubscriptions().get("my-sub").getConsumers().size(), 1);
+        assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgDropRate(), 0);
         assertEquals(topicStats.getPublishers().size(), 0);
+        assertEquals(topicStats.getMsgDropRate(), 0);
 
-        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false);
+        PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(nonPersistentTopicName, false);
         assertEquals(internalStats.cursors.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
 
         consumer.close();
-
-        topicStats = admin.topics().getStats(persistentTopicName);
+        topicStats = (NonPersistentTopicStats) admin.topics().getStats(nonPersistentTopicName);
         assertTrue(topicStats.getSubscriptions().containsKey("my-sub"));
         assertEquals(topicStats.getPublishers().size(), 0);
-
         // test partitioned-topic
         final String partitionedTopicName = "non-persistent://prop-xyz/ns1/paritioned";
-        assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0);
         admin.topics().createPartitionedTopic(partitionedTopicName, 5);
         assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5);
     }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStats.java
similarity index 76%
copy from pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
copy to pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStats.java
index 76ed0e3..68be731 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStats.java
@@ -19,14 +19,12 @@
 package org.apache.pulsar.common.policies.data;
 
 import java.util.Map;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
 /**
- * Statistics for a partitioned topic.
+ * Statistics for a non-persistent partitioned topic.
  */
-public interface PartitionedTopicStats extends TopicStats {
+public interface NonPersistentPartitionedTopicStats extends PartitionedTopicStats{
+    Map<String, ? extends NonPersistentTopicStats> getPartitions();
 
-    PartitionedTopicMetadata getMetadata();
-
-    Map<String, TopicStats> getPartitions();
-}
\ No newline at end of file
+    NonPersistentTopicStats add(NonPersistentTopicStats ts);
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
index 76ed0e3..f5ed8b2 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/PartitionedTopicStats.java
@@ -28,5 +28,7 @@ public interface PartitionedTopicStats extends TopicStats {
 
     PartitionedTopicMetadata getMetadata();
 
-    Map<String, TopicStats> getPartitions();
+    Map<String, ? extends TopicStats> getPartitions();
+
+    TopicStats add(TopicStats ts);
 }
\ No newline at end of file
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index bfbbe9f..7b518e2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -76,6 +76,8 @@ import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
@@ -707,19 +709,38 @@ public class TopicsImpl extends BaseResource implements Topics {
                 .queryParam("getPreciseBacklog", getPreciseBacklog)
                 .queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
         final CompletableFuture<TopicStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<TopicStats>() {
 
-                    @Override
-                    public void completed(TopicStats response) {
-                        future.complete(response);
-                    }
+        InvocationCallback<TopicStats> persistentCB = new InvocationCallback<TopicStats>() {
+            @Override
+            public void completed(TopicStats response) {
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        InvocationCallback<NonPersistentTopicStats> nonpersistentCB =
+                new InvocationCallback<NonPersistentTopicStats>() {
+            @Override
+            public void completed(NonPersistentTopicStats response) {
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        if (topic.startsWith(TopicDomain.non_persistent.value())) {
+            asyncGetRequest(path, nonpersistentCB);
+        } else {
+            asyncGetRequest(path, persistentCB);
+        }
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
         return future;
     }
 
@@ -829,22 +850,45 @@ public class TopicsImpl extends BaseResource implements Topics {
                 .queryParam("getPreciseBacklog", getPreciseBacklog)
                 .queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
         final CompletableFuture<PartitionedTopicStats> future = new CompletableFuture<>();
-        asyncGetRequest(path,
-                new InvocationCallback<PartitionedTopicStats>() {
 
-                    @Override
-                    public void completed(PartitionedTopicStats response) {
-                        if (!perPartition) {
-                            response.getPartitions().clear();
-                        }
-                        future.complete(response);
-                    }
+        InvocationCallback<NonPersistentPartitionedTopicStats> nonpersistentCB =
+                new InvocationCallback<NonPersistentPartitionedTopicStats>() {
 
-                    @Override
-                    public void failed(Throwable throwable) {
-                        future.completeExceptionally(getApiException(throwable.getCause()));
-                    }
-                });
+            @Override
+            public void completed(NonPersistentPartitionedTopicStats response) {
+                if (!perPartition) {
+                    response.getPartitions().clear();
+                }
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        InvocationCallback<PartitionedTopicStats> persistentCB = new InvocationCallback<PartitionedTopicStats>() {
+
+            @Override
+            public void completed(PartitionedTopicStats response) {
+                if (!perPartition) {
+                    response.getPartitions().clear();
+                }
+                future.complete(response);
+            }
+
+            @Override
+            public void failed(Throwable throwable) {
+                future.completeExceptionally(getApiException(throwable.getCause()));
+            }
+        };
+
+        if (topic.startsWith(TopicDomain.non_persistent.value())) {
+            asyncGetRequest(path, nonpersistentCB);
+        } else {
+            asyncGetRequest(path, persistentCB);
+        }
         return future;
     }
 
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 2db9bc0..a41f656 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.Brokers;
@@ -1389,26 +1390,29 @@ public class PulsarAdminToolTest {
     @Test
     public void nonPersistentTopics() throws Exception {
         PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
-        NonPersistentTopics mockTopics = mock(NonPersistentTopics.class);
-        when(admin.nonPersistentTopics()).thenReturn(mockTopics);
+        Topics mockTopics = mock(Topics.class);
+        when(admin.topics()).thenReturn(mockTopics);
 
-        CmdNonPersistentTopics topics = new CmdNonPersistentTopics(() -> admin);
+        CmdTopics topics = new CmdTopics(() -> admin);
 
-        topics.run(split("stats non-persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).getStats("non-persistent://myprop/clust/ns1/ds1");
+        topics.run(split("stats non-persistent://myprop/ns1/ds1"));
+        verify(mockTopics).getStats("non-persistent://myprop/ns1/ds1", false, false);
 
-        topics.run(split("stats-internal non-persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).getInternalStats("non-persistent://myprop/clust/ns1/ds1");
+        topics.run(split("stats-internal non-persistent://myprop/ns1/ds1"));
+        verify(mockTopics).getInternalStats("non-persistent://myprop/ns1/ds1", false);
 
-        topics.run(split("create-partitioned-topic non-persistent://myprop/clust/ns1/ds1 --partitions 32"));
-        verify(mockTopics).createPartitionedTopic("non-persistent://myprop/clust/ns1/ds1", 32);
+        topics.run(split("create-partitioned-topic non-persistent://myprop/ns1/ds1 --partitions 32"));
+        verify(mockTopics).createPartitionedTopic("non-persistent://myprop/ns1/ds1", 32);
 
-        topics.run(split("list myprop/clust/ns1"));
-        verify(mockTopics).getList("myprop/clust/ns1");
+        topics.run(split("list myprop/ns1"));
+        verify(mockTopics).getList("myprop/ns1", null);
 
-        topics.run(split("list-in-bundle myprop/clust/ns1 --bundle 0x23d70a30_0x26666658"));
-        verify(mockTopics).getListInBundle("myprop/clust/ns1", "0x23d70a30_0x26666658");
+        NonPersistentTopics mockNonPersistentTopics = mock(NonPersistentTopics.class);
+        when(admin.nonPersistentTopics()).thenReturn(mockNonPersistentTopics);
 
+        CmdNonPersistentTopics nonPersistentTopics = new CmdNonPersistentTopics(() -> admin);
+        nonPersistentTopics.run(split("list-in-bundle myprop/clust/ns1 --bundle 0x23d70a30_0x26666658"));
+        verify(mockNonPersistentTopics).getListInBundle("myprop/clust/ns1", "0x23d70a30_0x26666658");
     }
 
     @Test
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
new file mode 100644
index 0000000..748e634
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentPartitionedTopicStatsImpl.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats;
+import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Statistics for a non-persistent partitioned topic.
+ */
+@SuppressFBWarnings("EQ_DOESNT_OVERRIDE_EQUALS")
+public class NonPersistentPartitionedTopicStatsImpl extends NonPersistentTopicStatsImpl
+        implements NonPersistentPartitionedTopicStats {
+
+    @Getter
+    public PartitionedTopicMetadata metadata;
+
+    @Getter
+    public Map<String, NonPersistentTopicStatsImpl> partitions;
+
+    public NonPersistentPartitionedTopicStatsImpl() {
+        super();
+        metadata = new PartitionedTopicMetadata();
+        partitions = new HashMap<>();
+    }
+
+    public NonPersistentPartitionedTopicStatsImpl(PartitionedTopicMetadata metadata) {
+        this();
+        this.metadata = metadata;
+    }
+
+    @Override
+    public void reset() {
+        super.reset();
+        partitions.clear();
+        metadata.partitions = 0;
+    }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
index 06f59cb..51f350d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/NonPersistentTopicStatsImpl.java
@@ -18,15 +18,14 @@
  */
 package org.apache.pulsar.common.policies.data.stats;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import lombok.Data;
 import lombok.Getter;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
-import org.apache.pulsar.common.policies.data.PublisherStats;
-import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,31 +46,52 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
     @Getter
     public double msgDropRate;
 
-    /** List of connected publishers on this topic w/ their stats. */
-    @Getter
-    public List<? extends NonPersistentPublisherStats> publishers;
+    @JsonIgnore
+    public List<PublisherStatsImpl> publishers;
 
-    /** Map of subscriptions with their individual statistics. */
-    @Getter
-    public Map<String, ? extends NonPersistentSubscriptionStats> subscriptions;
+    @JsonIgnore
+    public Map<String, SubscriptionStatsImpl> subscriptions;
 
-    /** Map of replication statistics by remote cluster context. */
-    @Getter
-    public Map<String, ? extends NonPersistentReplicatorStats> replication;
+    @JsonIgnore
+    public Map<String, ReplicatorStatsImpl> replication;
+
+    @JsonProperty("publishers")
+    public List<NonPersistentPublisherStats> getNonPersistentPublishers() {
+        return (List<NonPersistentPublisherStats>) nonPersistentPublishers;
+    }
+
+    @JsonProperty("subscriptions")
+    public Map<String, NonPersistentSubscriptionStats> getNonPersistentSubscriptions() {
+        return (Map<String, NonPersistentSubscriptionStats>) nonPersistentSubscriptions;
+    }
+
+    @JsonProperty("replication")
+    public Map<String, NonPersistentReplicatorStats> getNonPersistentReplicators() {
+        return (Map<String, NonPersistentReplicatorStats>) nonPersistentReplicators;
+    }
+
+    /** List of connected publishers on this non-persistent topic w/ their stats. */
+    public List<? extends NonPersistentPublisherStats> nonPersistentPublishers;
+
+    /** Map of non-persistent subscriptions with their individual statistics. */
+    public Map<String, ? extends NonPersistentSubscriptionStats> nonPersistentSubscriptions;
+
+    /** Map of non-persistent replication statistics by remote cluster context. */
+    public Map<String, ? extends NonPersistentReplicatorStats> nonPersistentReplicators;
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = "expected to override")
     public List<NonPersistentPublisherStats> getPublishers() {
-        return (List<NonPersistentPublisherStats>) publishers;
+        return (List<NonPersistentPublisherStats>) nonPersistentPublishers;
     }
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = "expected to override")
     public Map<String, NonPersistentSubscriptionStats> getSubscriptions() {
-        return (Map<String, NonPersistentSubscriptionStats>) subscriptions;
+        return (Map<String, NonPersistentSubscriptionStats>) nonPersistentSubscriptions;
     }
 
     @SuppressFBWarnings(value = "MF_CLASS_MASKS_FIELD", justification = "expected to override")
     public Map<String, NonPersistentReplicatorStats> getReplication() {
-        return (Map<String, NonPersistentReplicatorStats>) replication;
+        return (Map<String, NonPersistentReplicatorStats>) nonPersistentReplicators;
     }
 
     @Override
@@ -80,22 +100,77 @@ public class NonPersistentTopicStatsImpl extends TopicStatsImpl implements NonPe
     }
 
     public NonPersistentTopicStatsImpl() {
-        this.publishers = new ArrayList<>();
-        this.subscriptions = new HashMap<>();
-        this.replication = new TreeMap<>();
+        this.nonPersistentPublishers = new ArrayList<>();
+        this.nonPersistentSubscriptions = new HashMap<>();
+        this.nonPersistentReplicators = new TreeMap<>();
     }
 
     public void reset() {
         super.reset();
+        this.nonPersistentPublishers.clear();
+        this.nonPersistentSubscriptions.clear();
+        this.nonPersistentReplicators.clear();
         this.msgDropRate = 0;
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
     // stats.
-    public NonPersistentTopicStatsImpl add(NonPersistentTopicStatsImpl stats) {
+    public NonPersistentTopicStatsImpl add(NonPersistentTopicStats ts) {
+        NonPersistentTopicStatsImpl stats = (NonPersistentTopicStatsImpl) ts;
         Objects.requireNonNull(stats);
         super.add(stats);
         this.msgDropRate += stats.msgDropRate;
+
+        if (this.getNonPersistentPublishers().size() != stats.getNonPersistentPublishers().size()) {
+            for (int i = 0; i < stats.getNonPersistentPublishers().size(); i++) {
+                NonPersistentPublisherStatsImpl publisherStats = new NonPersistentPublisherStatsImpl();
+                this.getNonPersistentPublishers().add(publisherStats
+                        .add((NonPersistentPublisherStatsImpl) stats.getNonPersistentPublishers().get(i)));
+            }
+        } else {
+            for (int i = 0; i < stats.getNonPersistentPublishers().size(); i++) {
+                ((NonPersistentPublisherStatsImpl) this.getNonPersistentPublishers().get(i))
+                        .add((NonPersistentPublisherStatsImpl) stats.getNonPersistentPublishers().get(i));
+            }
+        }
+
+        if (this.getNonPersistentSubscriptions().size() != stats.getNonPersistentSubscriptions().size()) {
+            for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
+                NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
+                this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
+                        .add((NonPersistentSubscriptionStatsImpl) stats.getNonPersistentSubscriptions().get(subscription)));
+            }
+        } else {
+            for (String subscription : stats.getNonPersistentSubscriptions().keySet()) {
+                if (this.getNonPersistentSubscriptions().get(subscription) != null) {
+                    ((NonPersistentSubscriptionStatsImpl) this.getNonPersistentSubscriptions().get(subscription))
+                          .add((NonPersistentSubscriptionStatsImpl) stats.getNonPersistentSubscriptions().get(subscription));
+                } else {
+                    NonPersistentSubscriptionStatsImpl subscriptionStats = new NonPersistentSubscriptionStatsImpl();
+                    this.getNonPersistentSubscriptions().put(subscription, subscriptionStats
+                         .add((NonPersistentSubscriptionStatsImpl) stats.getNonPersistentSubscriptions().get(subscription)));
+                }
+            }
+        }
+
+        if (this.getNonPersistentReplicators().size() != stats.getNonPersistentReplicators().size()) {
+            for (String repl : stats.getNonPersistentReplicators().keySet()) {
+                NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
+                this.getNonPersistentReplicators().put(repl, replStats
+                        .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
+            }
+        } else {
+            for (String repl : stats.getNonPersistentReplicators().keySet()) {
+                if (this.getNonPersistentReplicators().get(repl) != null) {
+                    ((NonPersistentReplicatorStatsImpl) this.getNonPersistentReplicators().get(repl))
+                            .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl));
+                } else {
+                    NonPersistentReplicatorStatsImpl replStats = new NonPersistentReplicatorStatsImpl();
+                    this.getNonPersistentReplicators().put(repl, replStats
+                            .add((NonPersistentReplicatorStatsImpl) stats.getNonPersistentReplicators().get(repl)));
+                }
+            }
+        }
         return this;
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index ffab615..94e1b7a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -63,6 +63,7 @@ import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
 import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
+import org.apache.pulsar.common.policies.data.NonPersistentPartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
 import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
@@ -90,6 +91,7 @@ import org.apache.pulsar.common.policies.data.impl.BundlesDataImpl;
 import org.apache.pulsar.common.policies.data.impl.DelayedDeliveryPoliciesImpl;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
@@ -183,6 +185,7 @@ public class ObjectMapperFactory {
         resolver.addMapping(NonPersistentSubscriptionStats.class, NonPersistentSubscriptionStatsImpl.class);
         resolver.addMapping(NonPersistentTopicStats.class, NonPersistentTopicStatsImpl.class);
         resolver.addMapping(PartitionedTopicStats.class, PartitionedTopicStatsImpl.class);
+        resolver.addMapping(NonPersistentPartitionedTopicStats.class, NonPersistentPartitionedTopicStatsImpl.class);
         resolver.addMapping(PublisherStats.class, PublisherStatsImpl.class);
         resolver.addMapping(ReplicatorStats.class, ReplicatorStatsImpl.class);
         resolver.addMapping(SubscriptionStats.class, SubscriptionStatsImpl.class);
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
new file mode 100644
index 0000000..5135acf
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/NonPersistentPartitionedTopicStatsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import org.apache.pulsar.common.policies.data.stats.NonPersistentPartitionedTopicStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class NonPersistentPartitionedTopicStatsTest {
+
+    @Test
+    public void testPartitionedTopicStats() {
+        NonPersistentPartitionedTopicStatsImpl nonPersistentPartitionedTopicStats = new NonPersistentPartitionedTopicStatsImpl();
+        nonPersistentPartitionedTopicStats.msgRateIn = 1;
+        nonPersistentPartitionedTopicStats.msgThroughputIn = 1;
+        nonPersistentPartitionedTopicStats.msgRateOut = 1;
+        nonPersistentPartitionedTopicStats.msgThroughputOut = 1;
+        nonPersistentPartitionedTopicStats.averageMsgSize = 1;
+        nonPersistentPartitionedTopicStats.storageSize = 1;
+        nonPersistentPartitionedTopicStats.getPublishers().add(new NonPersistentPublisherStatsImpl());
+        nonPersistentPartitionedTopicStats.getSubscriptions().put("test_ns", new NonPersistentSubscriptionStatsImpl());
+        nonPersistentPartitionedTopicStats.getReplication().put("test_ns", new NonPersistentReplicatorStatsImpl());
+        nonPersistentPartitionedTopicStats.metadata.partitions = 1;
+        nonPersistentPartitionedTopicStats.partitions.put("test", nonPersistentPartitionedTopicStats);
+        nonPersistentPartitionedTopicStats.reset();
+        assertEquals(nonPersistentPartitionedTopicStats.msgRateIn, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.msgThroughputIn, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.msgRateOut, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.msgThroughputOut, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.averageMsgSize, 0.0);
+        assertEquals(nonPersistentPartitionedTopicStats.storageSize, 0);
+        assertEquals(nonPersistentPartitionedTopicStats.getPublishers().size(), 0);
+        assertEquals(nonPersistentPartitionedTopicStats.getSubscriptions().size(), 0);
+        assertEquals(nonPersistentPartitionedTopicStats.getReplication().size(), 0);
+        assertEquals(nonPersistentPartitionedTopicStats.metadata.partitions, 0);
+        assertEquals(nonPersistentPartitionedTopicStats.partitions.size(), 0);
+    }
+}