You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/12 13:44:54 UTC

[pulsar] 03/17: Expose pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer. (#6918)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c5644b4b5bac05b8c3f82b7eda89ee750e5c725
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat May 9 03:21:28 2020 +0800

    Expose pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer. (#6918)
    
    Fixes #6891
    Rated to #5802
    
    Add pulsar_out_bytes_total and pulsar_out_messages_total for namespace/subscription/consumer.
    
    New unit test added.
    (cherry picked from commit 204f3271914f5fbdeb7bf0a7ff5a71e77b8cccbf)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   9 ++
 .../nonpersistent/NonPersistentSubscription.java   |   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +
 .../service/persistent/PersistentSubscription.java |   2 +
 .../broker/service/persistent/PersistentTopic.java |   2 +
 .../stats/prometheus/AggregatedConsumerStats.java  |   4 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  10 ++
 .../prometheus/AggregatedSubscriptionStats.java    |   4 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  77 +++++----
 .../pulsar/broker/stats/prometheus/TopicStats.java |   8 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 172 ++++++++++++++++++++-
 .../pulsar/common/policies/data/ConsumerStats.java |   8 +
 .../common/policies/data/SubscriptionStats.java    |  10 ++
 .../pulsar/common/policies/data/TopicStats.java    |  19 ++-
 site2/docs/reference-metrics.md                    |   2 +
 15 files changed, 294 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c5f5bbf..31227db 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.mledger.Entry;
@@ -78,6 +79,8 @@ public class Consumer {
     private final String consumerName;
     private final Rate msgOut;
     private final Rate msgRedeliver;
+    private final LongAdder msgOutCounter;
+    private final LongAdder bytesOutCounter;
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
@@ -129,6 +132,8 @@ public class Consumer {
         this.cnx = cnx;
         this.msgOut = new Rate();
         this.msgRedeliver = new Rate();
+        this.bytesOutCounter = new LongAdder();
+        this.msgOutCounter = new LongAdder();
         this.appId = appId;
         this.authenticationData = cnx.authenticationData;
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
@@ -222,6 +227,8 @@ public class Consumer {
         MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
         incrementUnackedMessages(totalMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
+        msgOutCounter.add(totalMessages);
+        bytesOutCounter.add(totalBytes);
 
         ctx.channel().eventLoop().execute(() -> {
             for (int i = 0; i < entries.size(); i++) {
@@ -457,6 +464,8 @@ public class Consumer {
     }
 
     public ConsumerStats getStats() {
+        stats.msgOutCounter = msgOutCounter.longValue();
+        stats.bytesOutCounter = bytesOutCounter.longValue();
         stats.lastAckedTimestamp = lastAckedTimestamp;
         stats.lastConsumedTimestamp = lastConsumedTimestamp;
         stats.availablePermits = getAvailablePermits();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index f653ee5..316024a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -384,6 +384,8 @@ public class NonPersistentSubscription implements Subscription {
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
+                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
+                subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
             });
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 332abfb..0a99a64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -778,6 +778,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
 
             stats.msgRateOut += subStats.msgRateOut;
             stats.msgThroughputOut += subStats.msgThroughputOut;
+            stats.bytesOutCounter += subStats.bytesOutCounter;
+            stats.msgOutCounter += subStats.msgOutCounter;
             stats.getSubscriptions().put(name, subStats);
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 5e6a213..25633f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -946,6 +946,8 @@ public class PersistentSubscription implements Subscription {
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
+                subStats.bytesOutCounter += consumerStats.bytesOutCounter;
+                subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
                 subStats.unackedMessages += consumerStats.unackedMessages;
                 subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b7470fa..28911bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1510,6 +1510,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
             stats.msgRateOut += subStats.msgRateOut;
             stats.msgThroughputOut += subStats.msgThroughputOut;
+            stats.bytesOutCounter += subStats.bytesOutCounter;
+            stats.msgOutCounter += subStats.msgOutCounter;
             stats.subscriptions.put(name, subStats);
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 0fadf3e..8b6bf7d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -31,4 +31,8 @@ public class AggregatedConsumerStats {
     public double msgThroughputOut;
 
     public long availablePermits;
+
+    long msgOutCounter;
+
+    long bytesOutCounter;
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index ea05ed0..1100523 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -34,6 +34,11 @@ public class AggregatedNamespaceStats {
     public double throughputIn;
     public double throughputOut;
 
+    public long bytesInCounter;
+    public long msgInCounter;
+    public long bytesOutCounter;
+    public long msgOutCounter;
+
     public long storageSize;
     public long msgBacklog;
     public long msgDelayed;
@@ -65,6 +70,11 @@ public class AggregatedNamespaceStats {
         throughputIn += stats.throughputIn;
         throughputOut += stats.throughputOut;
 
+        bytesInCounter += stats.bytesInCounter;
+        msgInCounter += stats.msgInCounter;
+        bytesOutCounter += stats.bytesOutCounter;
+        msgOutCounter += stats.msgOutCounter;
+
         storageSize += stats.storageSize;
         backlogSize += stats.backlogSize;
         offloadedStorageUsed += stats.offloadedStorageUsed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index d5b5353..1f1e879 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -41,5 +41,9 @@ public class AggregatedSubscriptionStats {
 
     public long msgDelayed;
 
+    long msgOutCounter;
+
+    long bytesOutCounter;
+
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a7b35b8..f032ea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
@@ -104,8 +105,11 @@ public class NamespaceStatsAggregator {
             stats.storageReadRate = mlStats.getReadEntriesRate();
         }
 
-        stats.msgInCounter = topic.getStats(getPreciseBacklog).msgInCounter;
-        stats.bytesInCounter = topic.getStats(getPreciseBacklog).bytesInCounter;
+        org.apache.pulsar.common.policies.data.TopicStats tStatus = topic.getStats(getPreciseBacklog);
+        stats.msgInCounter = tStatus.msgInCounter;
+        stats.bytesInCounter = tStatus.bytesInCounter;
+        stats.msgOutCounter = tStatus.msgOutCounter;
+        stats.bytesOutCounter = tStatus.bytesOutCounter;
 
         stats.producersCount = 0;
         topic.getProducers().values().forEach(producer -> {
@@ -123,43 +127,53 @@ public class NamespaceStatsAggregator {
             }
         });
 
-        topic.getSubscriptions().forEach((name, subscription) -> {
+        tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
             stats.subscriptionsCount++;
-            stats.msgBacklog += subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
+            stats.msgBacklog += subscriptionStats.msgBacklog;
 
             AggregatedSubscriptionStats subsStats = stats.subscriptionStats
-                    .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
-            subsStats.msgBacklog = subscription.getNumberOfEntriesInBacklog(getPreciseBacklog);
-            subsStats.msgDelayed = subscription.getNumberOfEntriesDelayed();
+                    .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+            subsStats.msgBacklog = subscriptionStats.msgBacklog;
+            subsStats.msgDelayed = subscriptionStats.msgDelayed;
             subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+            stats.rateOut += subsStats.msgRateOut;
+            stats.throughputOut += subsStats.msgThroughputOut;
+            subscriptionStats.consumers.forEach(cStats -> {
+                stats.consumersCount++;
+                subsStats.unackedMessages += cStats.unackedMessages;
+                subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
+                subsStats.msgRateOut += cStats.msgRateOut;
+                subsStats.msgThroughputOut += cStats.msgThroughputOut;
+                subsStats.bytesOutCounter += cStats.bytesOutCounter;
+                subsStats.msgOutCounter += cStats.msgOutCounter;
+                if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
+                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
+                }
+            });
+        });
 
-            subscription.getConsumers().forEach(consumer -> {
+        // Consumer stats can be a lot if a subscription has many consumers
+        if (includeConsumerMetrics) {
+            topic.getSubscriptions().forEach((name, subscription) -> {
+                AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                        .computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
+                subscription.getConsumers().forEach(consumer -> {
+                    ConsumerStats conStats = consumer.getStats();
 
-                // Consumer stats can be a lot if a subscription has many consumers
-                if (includeConsumerMetrics) {
                     AggregatedConsumerStats consumerStats = subsStats.consumerStat
                             .computeIfAbsent(consumer, k -> new AggregatedConsumerStats());
-                    consumerStats.unackedMessages = consumer.getStats().unackedMessages;
-                    consumerStats.msgRateRedeliver = consumer.getStats().msgRateRedeliver;
-                    consumerStats.msgRateOut = consumer.getStats().msgRateOut;
-                    consumerStats.msgThroughputOut = consumer.getStats().msgThroughputOut;
-                    consumerStats.availablePermits = consumer.getStats().availablePermits;
-                    consumerStats.blockedSubscriptionOnUnackedMsgs = consumer.getStats().blockedConsumerOnUnackedMsgs;
-                }
-
-                subsStats.unackedMessages += consumer.getStats().unackedMessages;
-                subsStats.msgRateRedeliver += consumer.getStats().msgRateRedeliver;
-                subsStats.msgRateOut += consumer.getStats().msgRateOut;
-                subsStats.msgThroughputOut += consumer.getStats().msgThroughputOut;
-                if (!subsStats.blockedSubscriptionOnUnackedMsgs && consumer.getStats().blockedConsumerOnUnackedMsgs) {
-                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
-                }
 
-                stats.consumersCount++;
-                stats.rateOut += consumer.getStats().msgRateOut;
-                stats.throughputOut += consumer.getStats().msgThroughputOut;
+                    consumerStats.unackedMessages = conStats.unackedMessages;
+                    consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
+                    consumerStats.msgRateOut = conStats.msgRateOut;
+                    consumerStats.msgThroughputOut = conStats.msgThroughputOut;
+                    consumerStats.bytesOutCounter = conStats.bytesOutCounter;
+                    consumerStats.msgOutCounter = conStats.msgOutCounter;
+                    consumerStats.availablePermits = conStats.availablePermits;
+                    consumerStats.blockedSubscriptionOnUnackedMsgs = conStats.blockedConsumerOnUnackedMsgs;
+                });
             });
-        });
+        }
 
         topic.getReplicators().forEach((cluster, replicator) -> {
             AggregatedReplicationStats aggReplStats = stats.replicationStats.computeIfAbsent(cluster,
@@ -206,6 +220,11 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
         metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
 
+        metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
+        metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
+        metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
+        metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
+
         metric(stream, cluster, namespace, "pulsar_storage_size", stats.storageSize);
         metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.backlogSize);
         metric(stream, cluster, namespace, "pulsar_storage_offloaded_size", stats.offloadedStorageUsed);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index caf0ce8..0510d34 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -36,6 +36,8 @@ class TopicStats {
     double throughputOut;
     long msgInCounter;
     long bytesInCounter;
+    long msgOutCounter;
+    long bytesOutCounter;
 
     long storageSize;
     public long msgBacklog;
@@ -67,6 +69,8 @@ class TopicStats {
         throughputOut = 0;
         bytesInCounter = 0;
         msgInCounter = 0;
+        bytesOutCounter = 0;
+        msgOutCounter = 0;
 
         storageSize = 0;
         msgBacklog = 0;
@@ -141,6 +145,8 @@ class TopicStats {
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages", subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out", subsStats.msgRateOut);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out", subsStats.msgThroughputOut);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total", subsStats.bytesOutCounter);
+            metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total", subsStats.msgOutCounter);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_unacked_messages", consumerStats.unackedMessages);
@@ -148,6 +154,8 @@ class TopicStats {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut);
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_consumer_available_permits", consumerStats.availablePermits);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_bytes_total", consumerStats.bytesOutCounter);
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(), "pulsar_out_messages_total", consumerStats.msgOutCounter);
             });
         });
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e569caf..92abb5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -34,9 +34,12 @@ import java.util.regex.Pattern;
 
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.MoreObjects;
@@ -45,13 +48,14 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 
 public class PrometheusMetricsTest extends BrokerTestBase {
-    @BeforeClass
+
+    @BeforeMethod
     @Override
     protected void setup() throws Exception {
         super.baseSetup();
     }
 
-    @AfterClass
+    @AfterMethod
     @Override
     protected void cleanup() throws Exception {
         super.internalCleanup();
@@ -61,12 +65,30 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     public void testPerTopicStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
-        for (int i = 0; i < 10; i++) {
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
             p2.send(message.getBytes());
         }
 
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, true, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
@@ -109,20 +131,58 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
+        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+        assertEquals(cm.size(), 2);
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+
         p1.close();
         p2.close();
+        c1.close();
+        c2.close();
     }
 
     @Test
     public void testPerNamespaceStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
         Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
-        for (int i = 0; i < 10; i++) {
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
             p2.send(message.getBytes());
         }
 
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
         ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
         String metricsStr = new String(statsOut.toByteArray());
@@ -141,12 +201,114 @@ public class PrometheusMetricsTest extends BrokerTestBase {
 
         cm = (List<Metric>) metrics.get("pulsar_producers_count");
         assertEquals(cm.size(), 2);
-        assertEquals(cm.get(1).value, 2.0);
         assertNull(cm.get(1).tags.get("topic"));
         assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
 
+        cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+
+        p1.close();
+        p2.close();
+        c1.close();
+        c2.close();
+    }
+
+    @Test
+    public void testPerConsumerStats() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+                .topic("persistent://my-property/use/my-ns/my-topic2")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 10;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, true, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+
+        metrics.entries().forEach(e -> {
+            System.out.println(e.getKey() + ": " + e.getValue());
+        });
+
+        // There should be 1 metric aggregated per namespace
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
+        assertEquals(cm.size(), 4);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("consumer_id"), "1");
+
+        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(2).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(3).tags.get("subscription"), "test");
+        assertEquals(cm.get(3).tags.get("consumer_id"), "0");
+
+        cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
+        assertEquals(cm.size(), 4);
+        assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(0).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
+        assertEquals(cm.get(1).tags.get("subscription"), "test");
+        assertEquals(cm.get(1).tags.get("consumer_id"), "1");
+
+        assertEquals(cm.get(2).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(2).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(2).tags.get("subscription"), "test");
+
+        assertEquals(cm.get(3).tags.get("namespace"), "my-property/use/my-ns");
+        assertEquals(cm.get(3).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
+        assertEquals(cm.get(3).tags.get("subscription"), "test");
+        assertEquals(cm.get(3).tags.get("consumer_id"), "0");
+
         p1.close();
         p2.close();
+        c1.close();
+        c2.close();
     }
 
     /** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index 7411f03..0ecb944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -32,6 +32,12 @@ public class ConsumerStats {
     /** Total throughput delivered to the consumer (bytes/s). */
     public double msgThroughputOut;
 
+    /** Total bytes delivered to consumer (bytes). */
+    public long bytesOutCounter;
+
+    /** Total messages delivered to consumer (msg). */
+    public long msgOutCounter;
+
     /** Total rate of messages redelivered by this consumer (msg/s). */
     public double msgRateRedeliver;
 
@@ -75,6 +81,8 @@ public class ConsumerStats {
         checkNotNull(stats);
         this.msgRateOut += stats.msgRateOut;
         this.msgThroughputOut += stats.msgThroughputOut;
+        this.bytesOutCounter += stats.bytesOutCounter;
+        this.msgOutCounter += stats.msgOutCounter;
         this.msgRateRedeliver += stats.msgRateRedeliver;
         this.availablePermits += stats.availablePermits;
         this.unackedMessages += stats.unackedMessages;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 7064883..df8fc72 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -34,6 +34,12 @@ public class SubscriptionStats {
     /** Total throughput delivered on this subscription (bytes/s). */
     public double msgThroughputOut;
 
+    /** Total bytes delivered to consumer (bytes). */
+    public long bytesOutCounter;
+
+    /** Total messages delivered to consumer (msg). */
+    public long msgOutCounter;
+
     /** Total rate of messages redelivered on this subscription (msg/s). */
     public double msgRateRedeliver;
 
@@ -86,6 +92,8 @@ public class SubscriptionStats {
     public void reset() {
         msgRateOut = 0;
         msgThroughputOut = 0;
+        bytesOutCounter = 0;
+        msgOutCounter = 0;
         msgRateRedeliver = 0;
         msgBacklog = 0;
         msgBacklogNoDelayed = 0;
@@ -101,6 +109,8 @@ public class SubscriptionStats {
         checkNotNull(stats);
         this.msgRateOut += stats.msgRateOut;
         this.msgThroughputOut += stats.msgThroughputOut;
+        this.bytesOutCounter += stats.bytesOutCounter;
+        this.msgOutCounter += stats.msgOutCounter;
         this.msgRateRedeliver += stats.msgRateRedeliver;
         this.msgBacklog += stats.msgBacklog;
         this.msgBacklogNoDelayed += stats.msgBacklogNoDelayed;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index 9962fa9..be471e3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -43,6 +43,18 @@ public class TopicStats {
     /** Total throughput of messages dispatched for the topic (byte/s). */
     public double msgThroughputOut;
 
+    /** Total bytes published to the topic (bytes). */
+    public long bytesInCounter;
+
+    /** Total messages published to the topic (msg). */
+    public long msgInCounter;
+
+    /** Total bytes delivered to consumer (bytes). */
+    public long bytesOutCounter;
+
+    /** Total messages delivered to consumer (msg). */
+    public long msgOutCounter;
+
     /** Average size of published messages (bytes). */
     public double averageMsgSize;
 
@@ -63,9 +75,6 @@ public class TopicStats {
 
     public String deduplicationStatus;
 
-    public long bytesInCounter;
-    public long msgInCounter;
-
     public TopicStats() {
         this.publishers = Lists.newArrayList();
         this.subscriptions = Maps.newHashMap();
@@ -83,6 +92,8 @@ public class TopicStats {
         this.backlogSize = 0;
         this.bytesInCounter = 0;
         this.msgInCounter = 0;
+        this.bytesOutCounter = 0;
+        this.msgOutCounter = 0;
         this.publishers.clear();
         this.subscriptions.clear();
         this.replication.clear();
@@ -100,6 +111,8 @@ public class TopicStats {
         this.msgThroughputOut += stats.msgThroughputOut;
         this.bytesInCounter += stats.bytesInCounter;
         this.msgInCounter += stats.msgInCounter;
+        this.bytesOutCounter += stats.bytesOutCounter;
+        this.msgOutCounter += stats.msgOutCounter;
         double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 3983dcd..b57406c 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -174,6 +174,8 @@ All the topic metrics are labelled with the following labels:
 | pulsar_entry_size_le_* | Histogram | The entry rate of a topic that the entry size is smaller with a given threshold.<br> Available thresholds: <br><ul><li>pulsar_entry_size_le_128: <= 128 bytes </li><li>pulsar_entry_size_le_512: <= 512 bytes</li><li>pulsar_entry_size_le_1_kb: <= 1 KB</li><li>pulsar_entry_size_le_2_kb: <= 2 KB</li><li>pulsar_entry_size_le_4_kb: <= 4 KB</li><li>pulsar_entry_size_le_16_kb: <= 16 KB</li><li>pulsar_entry_size_le_100_kb: <= 100 KB</li><li>pulsar_entry_size_ [...]
 | pulsar_in_bytes_total | Counter | The total number of bytes received for this topic |
 | pulsar_producers_count | Counter | The total number of messages received for this topic |
+| pulsar_out_bytes_total | Counter | The total number of bytes read from this topic |
+| pulsar_out_messages_total | Counter | The total number of messages read from this topic |
 
 #### Replication metrics