You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:56:59 UTC

[pulsar] 05/27: Add subscription backlog size info for topicstats. (#9302)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bc18625491e767d467ad6a1a989a326f5e3a733a
Author: Marvin Cai <zx...@streamnative.io>
AuthorDate: Sun Feb 7 20:49:38 2021 -0800

    Add subscription backlog size info for topicstats. (#9302)
    
    Fixes #9254
    
    Add ability to fetch backlog size for subscription, add flag in topic-stats partitioned-topic-stats for getting backlog size for subscriptions.
    Sample output
    ```
    ./pulsar-admin topics partitioned-stats zxc-t/zxc-ns/zxc-p -gpb -sbs
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 0,
      "msgInCounter" : 0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 875,
      "backlogSize" : 585,
      "publishers" : [ ],
      "waitingPublishers" : 0,
      "subscriptions" : {
        "zxc-sub-1" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "bytesOutCounter" : 0,
          "msgOutCounter" : 0,
          "msgRateRedeliver" : 0.0,
          "chuckedMessageRate" : 0,
          "msgBacklog" : 10,
          "backlogSize" : 585,
          "msgBacklogNoDelayed" : 10,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "msgRateExpired" : 0.0,
          "totalMsgExpired" : 0,
          "lastExpireTimestamp" : 0,
          "lastConsumedFlowTimestamp" : 0,
          "lastConsumedTimestamp" : 0,
          "lastAckedTimestamp" : 0,
          "lastMarkDeleteAdvancedTimestamp" : 0,
          "consumers" : [ ],
          "isDurable" : true,
          "isReplicated" : false,
          "consumersAfterMarkDeletePosition" : { },
          "nonContiguousDeletedMessagesRanges" : 0,
          "nonContiguousDeletedMessagesRangesSerializedSize" : 0
        },
        "zxc-sub-2" : {
          "msgRateOut" : 0.0,
          "msgThroughputOut" : 0.0,
          "bytesOutCounter" : 0,
          "msgOutCounter" : 0,
          "msgRateRedeliver" : 0.0,
          "chuckedMessageRate" : 0,
          "msgBacklog" : 5,
          "backlogSize" : 295,
          "msgBacklogNoDelayed" : 5,
          "blockedSubscriptionOnUnackedMsgs" : false,
          "msgDelayed" : 0,
          "unackedMessages" : 0,
          "msgRateExpired" : 0.0,
          "totalMsgExpired" : 0,
          "lastExpireTimestamp" : 0,
          "lastConsumedFlowTimestamp" : 0,
          "lastConsumedTimestamp" : 0,
          "lastAckedTimestamp" : 0,
          "lastMarkDeleteAdvancedTimestamp" : 0,
          "consumers" : [ ],
          "isDurable" : true,
          "isReplicated" : false,
          "consumersAfterMarkDeletePosition" : { },
          "nonContiguousDeletedMessagesRanges" : 0,
          "nonContiguousDeletedMessagesRangesSerializedSize" : 0
        }
      },
      "replication" : { },
      "nonContiguousDeletedMessagesRanges" : 0,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 0,
      "metadata" : {
        "partitions" : 5
      },
      "partitions" : { }
    }
    ```
    
    In ManagedLedgerImpl add API to get backlog size starting from specific position.
    In Admin Rest API and CLI add option to get subscription backlog.
    
    This change added tests and can be verified as follows:
    Added check in existing test to verify backlog size.
    
    (cherry picked from commit ab94743154d367af8cbcfd55916e0a2680073237)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 10 ++++++
 .../apache/pulsar/broker/ServiceConfiguration.java |  7 ++++
 .../broker/admin/impl/PersistentTopicsBase.java    | 11 ++++---
 .../broker/admin/v1/NonPersistentTopics.java       |  2 +-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  4 +--
 .../broker/admin/v2/NonPersistentTopics.java       |  9 +++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 19 +++++++----
 .../pulsar/broker/service/AbstractTopic.java       |  4 +--
 .../pulsar/broker/service/BrokerService.java       |  2 +-
 .../org/apache/pulsar/broker/service/Topic.java    |  2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  2 +-
 .../service/persistent/PersistentSubscription.java |  6 +++-
 .../broker/service/persistent/PersistentTopic.java |  4 +--
 .../stats/prometheus/NamespaceStatsAggregator.java | 11 ++++---
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 19 +++++++----
 .../pulsar/broker/service/BrokerServiceTest.java   | 18 +++++-----
 .../client/api/DispatcherBlockConsumerTest.java    |  4 +--
 .../pulsar/client/api/NonPersistentTopicTest.java  | 10 +++---
 .../pulsar/client/impl/MessageChunkingTest.java    |  2 +-
 .../org/apache/pulsar/client/admin/Topics.java     | 38 ++++++++++++++++------
 .../pulsar/client/admin/internal/TopicsImpl.java   | 24 +++++++++-----
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 +--
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 14 ++++++--
 .../common/policies/data/SubscriptionStats.java    |  5 +++
 24 files changed, 157 insertions(+), 74 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8dc3ea6..9694f2d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1023,6 +1023,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    /**
+     * Get estimated backlog size from a specific position.
+     */
+    public long getEstimatedBacklogSize(PositionImpl pos) {
+        if (pos == null) {
+            return 0;
+        }
+        return estimateBacklogFromPosition(pos);
+    }
+
     long estimateBacklogFromPosition(PositionImpl pos) {
         synchronized (this) {
             LedgerInfo ledgerInfo = ledgers.get(pos.getLedgerId());
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 584d5a8..a2e5452 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1800,6 +1800,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private boolean exposePreciseBacklogInPrometheus = false;
 
+    @FieldContext(
+            category = CATEGORY_METRICS,
+            doc = "Enable expose the backlog size for each subscription when generating stats.\n" +
+                    " Locking is used for fetching the status so default to false."
+    )
+    private boolean exposeSubscriptionBacklogSizeInPrometheus = false;
+
     /**** --- Functions --- ****/
     @FieldContext(
         category = CATEGORY_FUNCTIONS,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index fb2e7a1..61a8310 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1077,14 +1077,15 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog) {
+    protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog,
+                                          boolean subscriptionBacklogSize) {
         validateAdminAndClientPermission();
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
         validateTopicOwnership(topicName, authoritative);
         Topic topic = getTopicReference(topicName);
-        return topic.getStats(getPreciseBacklog);
+        return topic.getStats(getPreciseBacklog, subscriptionBacklogSize);
     }
 
     protected PersistentTopicInternalStats internalGetInternalStats(boolean authoritative, boolean metadata) {
@@ -1199,7 +1200,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
-            boolean perPartition, boolean getPreciseBacklog) {
+            boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
         if (topicName.isGlobal()) {
             try {
                 validateGlobalNamespaceOwnership(namespaceName);
@@ -1219,7 +1220,9 @@ public class PersistentTopicsBase extends AdminResource {
             for (int i = 0; i < partitionMetadata.partitions; i++) {
                 try {
                     topicStatsFutureList
-                            .add(pulsar().getAdminClient().topics().getStatsAsync((topicName.getPartition(i).toString()), getPreciseBacklog));
+                            .add(pulsar().getAdminClient().topics().getStatsAsync(
+                                    (topicName.getPartition(i).toString()), getPreciseBacklog,
+                                    subscriptionBacklogSize));
                 } catch (PulsarServerException e) {
                     asyncResponse.resume(new RestException(e));
                     return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index b2aa364..06a9f92 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -102,7 +102,7 @@ public class NonPersistentTopics extends PersistentTopics {
         validateTopicName(property, cluster, namespace, encodedTopic);
         validateAdminOperationOnTopic(authoritative);
         Topic topic = getTopicReference(topicName);
-        return ((NonPersistentTopic) topic).getStats(false);
+        return ((NonPersistentTopic) topic).getStats(false, false);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 663b745..d33cac0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -293,7 +293,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetStats(authoritative, false);
+        return internalGetStats(authoritative, false, false);
     }
 
     @GET
@@ -339,7 +339,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         try {
             validateTopicName(property, cluster, namespace, encodedTopic);
-            internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false);
+            internalGetPartitionedStats(asyncResponse, authoritative, perPartition, false, false);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 3805cf0..769e60d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -116,12 +116,15 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @ApiParam(value = "Is return precise backlog or imprecise backlog")
-            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
+            @ApiParam(value = "If return precise backlog or imprecise backlog")
+            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
+            @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+                    + "not to use when there's heavy traffic.")
+            @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
         validateTopicName(tenant, namespace, encodedTopic);
         validateAdminOperationOnTopic(topicName, authoritative);
         Topic topic = getTopicReference(topicName);
-        return ((NonPersistentTopic) topic).getStats(getPreciseBacklog);
+        return ((NonPersistentTopic) topic).getStats(getPreciseBacklog, subscriptionBacklogSize);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 18ab1b0..861fff4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -891,10 +891,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @ApiParam(value = "Is return precise backlog or imprecise backlog")
-            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
+            @ApiParam(value = "If return precise backlog or imprecise backlog")
+            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
+            @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+                    + "not to use when there's heavy traffic.")
+            @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetStats(authoritative, getPreciseBacklog);
+        return internalGetStats(authoritative, getPreciseBacklog, subscriptionBacklogSize);
     }
 
     @GET
@@ -970,11 +973,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @ApiParam(value = "Is return precise backlog or imprecise backlog")
-            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog) {
+            @ApiParam(value = "If return precise backlog or imprecise backlog")
+            @QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
+            @ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+                    + "not to use when there's heavy traffic.")
+            @QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
         try {
             validatePartitionedTopicName(tenant, namespace, encodedTopic);
-            internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog);
+            internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
+                    subscriptionBacklogSize);
         } catch (WebApplicationException wae) {
             asyncResponse.resume(wae);
         } catch (Exception e) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 2cab994..0d40ecb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -521,11 +521,11 @@ public abstract class AbstractTopic implements Topic {
     }
 
     public long getMsgOutCounter() {
-        return getStats(false).msgOutCounter;
+        return getStats(false, false).msgOutCounter;
     }
 
     public long getBytesOutCounter() {
-        return getStats(false).bytesOutCounter;
+        return getStats(false, false).bytesOutCounter;
     }
 
     public boolean isDeleteWhileInactive() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 30bd5b2..72d6b50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1689,7 +1689,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     public Map<String, TopicStats> getTopicStats() {
         HashMap<String, TopicStats> stats = new HashMap<>();
 
-        forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false)));
+        forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false)));
 
         return stats;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 660be0c..38bc584 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -181,7 +181,7 @@ public interface Topic {
 
     ConcurrentOpenHashMap<String, ? extends Replicator> getReplicators();
 
-    TopicStats getStats(boolean getPreciseBacklog);
+    TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize);
 
     CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 340df37..16f99a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -769,7 +769,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     }
 
     @Override
-    public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
+    public NonPersistentTopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
 
         NonPersistentTopicStats stats = new NonPersistentTopicStats();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index da910a8..1aeced3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -902,7 +902,7 @@ public class PersistentSubscription implements Subscription {
         return cursor.getEstimatedSizeSinceMarkDeletePosition();
     }
 
-    public SubscriptionStats getStats(Boolean getPreciseBacklog) {
+    public SubscriptionStats getStats(Boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
         SubscriptionStats subStats = new SubscriptionStats();
         subStats.lastExpireTimestamp = lastExpireTimestamp;
         subStats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
@@ -945,6 +945,10 @@ public class PersistentSubscription implements Subscription {
             }
         }
         subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
+        if (subscriptionBacklogSize) {
+            subStats.backlogSize = ((ManagedLedgerImpl) topic.getManagedLedger())
+                    .getEstimatedBacklogSize((PositionImpl) cursor.getMarkDeletedPosition());
+        }
         subStats.msgBacklogNoDelayed = subStats.msgBacklog - subStats.msgDelayed;
         subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();
         subStats.totalMsgExpired = expiryMonitor.getTotalMessageExpired();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 5ad3394..20733e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1579,7 +1579,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     }
 
     @Override
-    public TopicStats getStats(boolean getPreciseBacklog) {
+    public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
 
         TopicStats stats = new TopicStats();
 
@@ -1603,7 +1603,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         stats.msgChunkPublished = this.msgChunkPublished;
 
         subscriptions.forEach((name, subscription) -> {
-            SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
+            SubscriptionStats subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);
 
             stats.msgRateOut += subStats.msgRateOut;
             stats.msgThroughputOut += subStats.msgThroughputOut;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 3364214..cf89ea2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -62,8 +62,9 @@ public class NamespaceStatsAggregator {
 
             bundlesMap.forEach((bundle, topicsMap) -> {
                 topicsMap.forEach((name, topic) -> {
-                    getTopicStats(topic, topicStats, includeConsumerMetrics, pulsar.getConfiguration().isExposePreciseBacklogInPrometheus());
-
+                    getTopicStats(topic, topicStats, includeConsumerMetrics,
+                            pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
+                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());
                     if (includeTopicMetrics) {
                         topicsCount.add(1);
                         TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
@@ -83,7 +84,8 @@ public class NamespaceStatsAggregator {
         });
     }
 
-    private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics, boolean getPreciseBacklog) {
+    private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
+                                      boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
@@ -108,7 +110,8 @@ public class NamespaceStatsAggregator {
             stats.storageReadRate = mlStats.getReadEntriesRate();
         }
 
-        org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog);
+        org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog,
+                subscriptionBacklogSize);
         stats.msgInCounter = tStatus.msgInCounter;
         stats.bytesInCounter = tStatus.bytesInCounter;
         stats.msgOutCounter = tStatus.msgOutCounter;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 5c177da..f0c6a25 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1156,7 +1156,8 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         TopicStats topicStats = admin.topics().getStats(topic);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
 
-        topicStats = admin.topics().getStats(topic, true);
+        topicStats = admin.topics().getStats(topic, true, true);
+        assertEquals(topicStats.subscriptions.get(subName).backlogSize, 43);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1);
         consumer.acknowledge(message);
 
@@ -1164,7 +1165,8 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         Thread.sleep(500);
 
         // Consumer acks the message, so the precise backlog is 0
-        topicStats = admin.topics().getStats(topic, true);
+        topicStats = admin.topics().getStats(topic, true, true);
+        assertEquals(topicStats.subscriptions.get(subName).backlogSize, 0);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 0);
 
         topicStats = admin.topics().getStats(topic);
@@ -1209,7 +1211,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         // not yet guaranteed to see the stats updated.
         Thread.sleep(500);
 
-        TopicStats topicStats = admin.topics().getStats(topic, true);
+        TopicStats topicStats = admin.topics().getStats(topic, true, true);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5);
 
@@ -1218,7 +1220,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         }
         // Wait the ack send.
         Thread.sleep(500);
-        topicStats = admin.topics().getStats(topic, true);
+        topicStats = admin.topics().getStats(topic, true, true);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
     }
@@ -1260,8 +1262,9 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 20);
 
-        topicStats = admin.topics().getPartitionedStats(topic, false, true);
+        topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 1);
+        assertEquals(topicStats.subscriptions.get(subName).backlogSize, 43);
     }
 
     @Test(timeOut = 30000)
@@ -1298,8 +1301,9 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
             }
         }
 
-        TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true);
+        TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 10);
+        assertEquals(topicStats.subscriptions.get(subName).backlogSize, 470);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 5);
 
         for (int i = 0; i < 5; i++) {
@@ -1307,8 +1311,9 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         }
         // Wait the ack send.
         Thread.sleep(500);
-        topicStats = admin.topics().getPartitionedStats(topic, false, true);
+        topicStats = admin.topics().getPartitionedStats(topic, false, true, true);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklog, 5);
+        assertEquals(topicStats.subscriptions.get(subName).backlogSize, 238);
         assertEquals(topicStats.subscriptions.get(subName).msgBacklogNoDelayed, 0);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 5688f97..9cbc624 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -161,7 +161,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertNotNull(topicRef);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
 
         // subscription stats
@@ -182,7 +182,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
 
         // publisher stats
@@ -220,7 +220,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
         assertEquals(stats.offloadedStorageSize, 0);
 
@@ -234,13 +234,13 @@ public class BrokerServiceTest extends BrokerTestBase {
         PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
 
         assertNotNull(topicRef);
-        assertEquals(topicRef.getStats(false).storageSize, 0);
+        assertEquals(topicRef.getStats(false, false).storageSize, 0);
 
         for (int i = 0; i < 10; i++) {
             producer.send(new byte[10]);
         }
 
-        assertTrue(topicRef.getStats(false).storageSize > 0);
+        assertTrue(topicRef.getStats(false, false).storageSize > 0);
     }
 
     @Test
@@ -259,7 +259,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         assertNotNull(topicRef);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
 
         // subscription stats
@@ -277,7 +277,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
 
         // publisher stats
@@ -312,7 +312,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
         assertTrue(subStats.msgRateRedeliver > 0.0);
         assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver);
@@ -326,7 +326,7 @@ public class BrokerServiceTest extends BrokerTestBase {
         Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
 
         rolloverPerIntervalStats();
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.subscriptions.values().iterator().next();
 
         assertEquals(subStats.msgBacklog, 0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
index 80bf968..f4783f5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java
@@ -543,7 +543,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             assertNotNull(topicRef);
 
             rolloverPerIntervalStats();
-            stats = topicRef.getStats(false);
+            stats = topicRef.getStats(false, false);
             subStats = stats.subscriptions.values().iterator().next();
 
             // subscription stats
@@ -561,7 +561,7 @@ public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
             Thread.sleep(timeWaitToSync);
 
             rolloverPerIntervalStats();
-            stats = topicRef.getStats(false);
+            stats = topicRef.getStats(false, false);
             subStats = stats.subscriptions.values().iterator().next();
 
             assertTrue(subStats.msgBacklog > 0);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index 95bd4d9..bf844d5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -437,7 +437,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
         assertNotNull(topicRef);
 
         rolloverPerIntervalStats(pulsar);
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.getSubscriptions().values().iterator().next();
 
         // subscription stats
@@ -455,7 +455,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
         Thread.sleep(timeWaitToSync);
 
         rolloverPerIntervalStats(pulsar);
-        stats = topicRef.getStats(false);
+        stats = topicRef.getStats(false, false);
         subStats = stats.getSubscriptions().values().iterator().next();
 
         assertTrue(subStats.msgRateOut > 0);
@@ -519,7 +519,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             assertNotNull(replicatorR3);
 
             rolloverPerIntervalStats(replicationPulasr);
-            stats = topicRef.getStats(false);
+            stats = topicRef.getStats(false, false);
             subStats = stats.getSubscriptions().values().iterator().next();
 
             // subscription stats
@@ -590,7 +590,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             Thread.sleep(timeWaitToSync);
 
             rolloverPerIntervalStats(replicationPulasr);
-            stats = topicRef.getStats(false);
+            stats = topicRef.getStats(false, false);
             subStats = stats.getSubscriptions().values().iterator().next();
 
             assertTrue(subStats.msgRateOut > 0);
@@ -809,7 +809,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
 
             NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
             pulsar.getBrokerService().updateRates();
-            NonPersistentTopicStats stats = topic.getStats(false);
+            NonPersistentTopicStats stats = topic.getStats(false, false);
             NonPersistentPublisherStats npStats = stats.getPublishers().get(0);
             NonPersistentSubscriptionStats sub1Stats = stats.getSubscriptions().get("subscriber-1");
             NonPersistentSubscriptionStats sub2Stats = stats.getSubscriptions().get("subscriber-2");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 242bfc8..37c1f40 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -133,7 +133,7 @@ public class MessageChunkingTest extends ProducerConsumerBase {
 
         pulsar.getBrokerService().updateRates();
 
-        PublisherStats producerStats = topic.getStats(false).publishers.get(0);
+        PublisherStats producerStats = topic.getStats(false, false).publishers.get(0);
 
         assertTrue(producerStats.chunkedMessageRate > 0);
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 4c8a8ffa..69d4600 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -755,6 +755,8 @@ public interface Topics {
      *            topic name
      * @param getPreciseBacklog
      *            Set to true to get precise backlog, Otherwise get imprecise backlog.
+     * @param subscriptionBacklogSize
+     *            Whether to get backlog size for each subscription.
      * @return the topic statistics
      *
      * @throws NotAuthorizedException
@@ -764,10 +766,15 @@ public interface Topics {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException;
+    TopicStats getStats(String topic, boolean getPreciseBacklog,
+                        boolean subscriptionBacklogSize) throws PulsarAdminException;
+
+    default TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException {
+        return getStats(topic, getPreciseBacklog, false);
+    }
 
     default TopicStats getStats(String topic) throws PulsarAdminException {
-        return getStats(topic, false);
+        return getStats(topic, false, false);
     }
 
     /**
@@ -778,14 +785,16 @@ public interface Topics {
      *            topic name
      * @param getPreciseBacklog
      *            Set to true to get precise backlog, Otherwise get imprecise backlog.
-     *
+     * @param subscriptionBacklogSize
+     *            Whether to get backlog size for each subscription.
      * @return a future that can be used to track when the topic statistics are returned
      *
      */
-    CompletableFuture<TopicStats> getStatsAsync(String topic, boolean getPreciseBacklog);
+    CompletableFuture<TopicStats> getStatsAsync(String topic, boolean getPreciseBacklog,
+                                                boolean subscriptionBacklogSize);
 
     default CompletableFuture<TopicStats> getStatsAsync(String topic) {
-        return getStatsAsync(topic, false);
+        return getStatsAsync(topic, false, false);
     }
 
     /**
@@ -936,7 +945,11 @@ public interface Topics {
      * @param topic
      *            topic name
      * @param perPartition
-     *
+     *            flag to get stats per partition
+     * @param getPreciseBacklog
+     *            Set to true to get precise backlog, Otherwise get imprecise backlog.
+     * @param subscriptionBacklogSize
+     *            Whether to get backlog size for each subscription.
      * @return the partitioned topic statistics
      * @throws NotAuthorizedException
      *             Don't have admin permission
@@ -946,11 +959,12 @@ public interface Topics {
      *             Unexpected error
      *
      */
-    PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog)
+    PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog,
+                                              boolean subscriptionBacklogSize)
             throws PulsarAdminException;
 
     default PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition) throws PulsarAdminException {
-        return getPartitionedStats(topic, perPartition, false);
+        return getPartitionedStats(topic, perPartition, false, false);
     }
 
     /**
@@ -960,13 +974,17 @@ public interface Topics {
      *            topic Name
      * @param perPartition
      *            flag to get stats per partition
+     * @param getPreciseBacklog
+     *            Set to true to get precise backlog, Otherwise get imprecise backlog.
+     * @param subscriptionBacklogSize
+     *            Whether to get backlog size for each subscription.
      * @return a future that can be used to track when the partitioned topic statistics are returned
      */
     CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
-            String topic, boolean perPartition, boolean getPreciseBacklog);
+            String topic, boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize);
 
     default CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic, boolean perPartition) {
-        return getPartitionedStatsAsync(topic, perPartition, false);
+        return getPartitionedStatsAsync(topic, perPartition, false, false);
     }
 
     /**
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index eca4c5f..848eabb 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -627,9 +627,11 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
-    public TopicStats getStats(String topic, boolean getPreciseBacklog) throws PulsarAdminException {
+    public TopicStats getStats(String topic, boolean getPreciseBacklog,
+                               boolean subscriptionBacklogSize) throws PulsarAdminException {
         try {
-            return getStatsAsync(topic, getPreciseBacklog).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+            return getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
         } catch (InterruptedException e) {
@@ -641,9 +643,12 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
-    public CompletableFuture<TopicStats> getStatsAsync(String topic, boolean getPreciseBacklog) {
+    public CompletableFuture<TopicStats> getStatsAsync(String topic, boolean getPreciseBacklog,
+                                                       boolean subscriptionBacklogSize) {
         TopicName tn = validateTopic(topic);
-        WebTarget path = topicPath(tn, "stats").queryParam("getPreciseBacklog", getPreciseBacklog);
+        WebTarget path = topicPath(tn, "stats")
+                .queryParam("getPreciseBacklog", getPreciseBacklog)
+                .queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
         final CompletableFuture<TopicStats> future = new CompletableFuture<>();
         asyncGetRequest(path,
                 new InvocationCallback<TopicStats>() {
@@ -743,10 +748,11 @@ public class TopicsImpl extends BaseResource implements Topics {
     }
 
     @Override
-    public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog)
+    public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog,
+                                                     boolean subscriptionBacklogSize)
             throws PulsarAdminException {
         try {
-            return getPartitionedStatsAsync(topic, perPartition, getPreciseBacklog)
+            return getPartitionedStatsAsync(topic, perPartition, getPreciseBacklog, subscriptionBacklogSize)
                     .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
         } catch (ExecutionException e) {
             throw (PulsarAdminException) e.getCause();
@@ -760,10 +766,12 @@ public class TopicsImpl extends BaseResource implements Topics {
 
     @Override
     public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic,
-            boolean perPartition, boolean getPreciseBacklog) {
+            boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
         TopicName tn = validateTopic(topic);
         WebTarget path = topicPath(tn, "partitioned-stats");
-        path = path.queryParam("perPartition", perPartition).queryParam("getPreciseBacklog", getPreciseBacklog);
+        path = path.queryParam("perPartition", perPartition)
+                .queryParam("getPreciseBacklog", getPreciseBacklog)
+                .queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
         final CompletableFuture<PartitionedTopicStats> future = new CompletableFuture<>();
         asyncGetRequest(path,
                 new InvocationCallback<PartitionedTopicStats>() {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 35ef40b..5e77900 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -707,7 +707,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).deleteSubscription("persistent://myprop/clust/ns1/ds1", "sub1", false);
 
         cmdTopics.run(split("stats persistent://myprop/clust/ns1/ds1"));
-        verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false);
+        verify(mockTopics).getStats("persistent://myprop/clust/ns1/ds1", false, false);
 
         cmdTopics.run(split("stats-internal persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getInternalStats("persistent://myprop/clust/ns1/ds1", false);
@@ -716,7 +716,7 @@ public class PulsarAdminToolTest {
         verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("partitioned-stats persistent://myprop/clust/ns1/ds1 --per-partition"));
-        verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false);
+        verify(mockTopics).getPartitionedStats("persistent://myprop/clust/ns1/ds1", true, false, false);
 
         cmdTopics.run(split("partitioned-stats-internal persistent://myprop/clust/ns1/ds1"));
         verify(mockTopics).getPartitionedInternalStats("persistent://myprop/clust/ns1/ds1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index a14d83f..169a4cd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -521,10 +521,15 @@ public class CmdTopics extends CmdBase {
             "--get-precise-backlog" }, description = "Set true to get precise backlog")
         private boolean getPreciseBacklog = false;
 
+        @Parameter(names = { "-sbs",
+                "--get-subscription-backlog-size" }, description = "Set true to get backlog size for each subscription"
+        + ", locking required.")
+        private boolean subscriptionBacklogSize = false;
+
         @Override
         void run() throws PulsarAdminException {
             String topic = validateTopicName(params);
-            print(topics.getStats(topic, getPreciseBacklog));
+            print(topics.getStats(topic, getPreciseBacklog, subscriptionBacklogSize));
         }
     }
 
@@ -571,10 +576,15 @@ public class CmdTopics extends CmdBase {
             "--get-precise-backlog" }, description = "Set true to get precise backlog")
         private boolean getPreciseBacklog = false;
 
+        @Parameter(names = { "-sbs",
+                "--get-subscription-backlog-size" }, description = "Set true to get backlog size for each subscription"
+                + ", locking required.")
+        private boolean subscriptionBacklogSize = false;
+
         @Override
         void run() throws Exception {
             String topic = validateTopicName(params);
-            print(topics.getPartitionedStats(topic, perPartition, getPreciseBacklog));
+            print(topics.getPartitionedStats(topic, perPartition, getPreciseBacklog, subscriptionBacklogSize));
         }
     }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 1064b69..d9971f9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -53,6 +53,9 @@ public class SubscriptionStats {
     /** Number of messages in the subscription backlog. */
     public long msgBacklog;
 
+    /** Size of backlog in byte. **/
+    public long backlogSize;
+
     /** Number of messages in the subscription backlog that do not contain the delay messages. */
     public long msgBacklogNoDelayed;
 
@@ -122,6 +125,7 @@ public class SubscriptionStats {
         msgOutCounter = 0;
         msgRateRedeliver = 0;
         msgBacklog = 0;
+        backlogSize = 0;
         msgBacklogNoDelayed = 0;
         unackedMessages = 0;
         msgRateExpired = 0;
@@ -144,6 +148,7 @@ public class SubscriptionStats {
         this.msgOutCounter += stats.msgOutCounter;
         this.msgRateRedeliver += stats.msgRateRedeliver;
         this.msgBacklog += stats.msgBacklog;
+        this.backlogSize += stats.backlogSize;
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
         this.msgDelayed += stats.msgDelayed;
         this.unackedMessages += stats.unackedMessages;