You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/27 03:33:17 UTC

[pulsar] branch master updated: Add msg and bytes count stats to broker-stats/topics (#7045)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 72100be  Add msg and bytes count stats to broker-stats/topics (#7045)
72100be is described below

commit 72100be988c9af7aa4767726b470e69d1e42e4d6
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed May 27 11:32:57 2020 +0800

    Add msg and bytes count stats to broker-stats/topics (#7045)
---
 .../main/java/org/apache/pulsar/broker/service/AbstractTopic.java | 8 ++++++++
 .../pulsar/broker/service/nonpersistent/NonPersistentTopic.java   | 4 ++++
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 5 ++++-
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 414ebc6..ed9584e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -415,5 +415,13 @@ public abstract class AbstractTopic implements Topic {
         return this.bytesInCounter.longValue();
     }
 
+    public long getMsgOutCounter() {
+        return getStats(false).msgOutCounter;
+    }
+
+    public long getBytesOutCounter() {
+        return getStats(false).bytesOutCounter;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0a99a64..ca26a19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -733,6 +733,10 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
         topicStatsStream.writePair("msgThroughputIn", topicStats.aggMsgThroughputIn);
         topicStatsStream.writePair("msgThroughputOut", topicStats.aggMsgThroughputOut);
+        topicStatsStream.writePair("msgInCount", getMsgInCounter());
+        topicStatsStream.writePair("bytesInCount", getBytesInCounter());
+        topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
+        topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
 
         nsStats.msgRateIn += topicStats.aggMsgRateIn;
         nsStats.msgRateOut += topicStats.aggMsgRateOut;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 25a4d30..cbf6718 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -36,7 +36,6 @@ import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -1458,6 +1457,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
         topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
         topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
+        topicStatsStream.writePair("msgInCount", getMsgInCounter());
+        topicStatsStream.writePair("bytesInCount", getBytesInCounter());
+        topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
+        topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
         topicStatsStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
         topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
         topicStatsStream.writePair("storageSize", ledger.getTotalSize());