You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/28 01:44:34 UTC

[pulsar] branch master updated: [improve][admin] Expose last consumed flow timestamp for consumer stats (#16817)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bff696d40e9 [improve][admin] Expose last consumed flow timestamp for consumer stats (#16817)
bff696d40e9 is described below

commit bff696d40e94f1906b22537833064f1a6f91e1f1
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Jul 28 09:44:27 2022 +0800

    [improve][admin] Expose last consumed flow timestamp for consumer stats (#16817)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java   | 4 ++++
 .../java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java     | 7 +++++--
 .../java/org/apache/pulsar/common/policies/data/ConsumerStats.java | 1 +
 .../pulsar/common/policies/data/stats/ConsumerStatsImpl.java       | 2 ++
 4 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 8d63a283046..02304fe7ff3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -88,6 +88,7 @@ public class Consumer {
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
+    private long lastConsumedFlowTimestamp;
     private Rate chunkedMessageRate;
 
     // Represents how many messages we can safely send to the consumer without
@@ -681,6 +682,7 @@ public class Consumer {
 
     public void flowPermits(int additionalNumberOfMessages) {
         checkArgument(additionalNumberOfMessages > 0);
+        this.lastConsumedFlowTimestamp = System.currentTimeMillis();
 
         // block shared consumer when unacked-messages reaches limit
         if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) {
@@ -773,6 +775,7 @@ public class Consumer {
         msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter);
         lastAckedTimestamp = consumerStats.lastAckedTimestamp;
         lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
+        lastConsumedFlowTimestamp = consumerStats.lastConsumedFlowTimestamp;
         MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
@@ -788,6 +791,7 @@ public class Consumer {
         stats.bytesOutCounter = bytesOutCounter.longValue();
         stats.lastAckedTimestamp = lastAckedTimestamp;
         stats.lastConsumedTimestamp = lastConsumedTimestamp;
+        stats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
         stats.availablePermits = getAvailablePermits();
         stats.unackedMessages = unackedMessages;
         stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 66d13be10e6..106a40084d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -207,6 +207,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
                 "readPositionWhenJoining",
                 "lastAckedTimestamp",
                 "lastConsumedTimestamp",
+                "lastConsumedFlowTimestamp",
                 "keyHashRanges",
                 "metadata",
                 "address",
@@ -224,8 +225,10 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
         TopicStats stats = admin.topics().getStats(topicName);
         ObjectMapper mapper = ObjectMapperFactory.create();
-        JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(stats.getSubscriptions()
-                .get(subName).getConsumers().get(0)));
+        ConsumerStats consumerStats = stats.getSubscriptions()
+                .get(subName).getConsumers().get(0);
+        Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
+        JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
         Iterator<String> itr = node.fieldNames();
         while (itr.hasNext()) {
             String field = itr.next();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index b796ae504b9..5cccd437be4 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -83,6 +83,7 @@ public interface ConsumerStats {
 
     long getLastAckedTimestamp();
     long getLastConsumedTimestamp();
+    long getLastConsumedFlowTimestamp();
 
     /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
     List<String> getKeyHashRanges();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index cfae8c9464a..f84e10eaac2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -98,6 +98,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
     public long lastAckedTimestamp;
     public long lastConsumedTimestamp;
 
+    public long lastConsumedFlowTimestamp;
+
     /** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
     public List<String> keyHashRanges;