You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/07/28 01:44:34 UTC
[pulsar] branch master updated: [improve][admin] Expose last consumed flow timestamp for consumer stats (#16817)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bff696d40e9 [improve][admin] Expose last consumed flow timestamp for consumer stats (#16817)
bff696d40e9 is described below
commit bff696d40e94f1906b22537833064f1a6f91e1f1
Author: Penghui Li <pe...@apache.org>
AuthorDate: Thu Jul 28 09:44:27 2022 +0800
[improve][admin] Expose last consumed flow timestamp for consumer stats (#16817)
---
.../src/main/java/org/apache/pulsar/broker/service/Consumer.java | 4 ++++
.../java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java | 7 +++++--
.../java/org/apache/pulsar/common/policies/data/ConsumerStats.java | 1 +
.../pulsar/common/policies/data/stats/ConsumerStatsImpl.java | 2 ++
4 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 8d63a283046..02304fe7ff3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -88,6 +88,7 @@ public class Consumer {
private long lastConsumedTimestamp;
private long lastAckedTimestamp;
+ private long lastConsumedFlowTimestamp;
private Rate chunkedMessageRate;
// Represents how many messages we can safely send to the consumer without
@@ -681,6 +682,7 @@ public class Consumer {
public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
+ this.lastConsumedFlowTimestamp = System.currentTimeMillis();
// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) {
@@ -773,6 +775,7 @@ public class Consumer {
msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter);
lastAckedTimestamp = consumerStats.lastAckedTimestamp;
lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
+ lastConsumedFlowTimestamp = consumerStats.lastConsumedFlowTimestamp;
MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
@@ -788,6 +791,7 @@ public class Consumer {
stats.bytesOutCounter = bytesOutCounter.longValue();
stats.lastAckedTimestamp = lastAckedTimestamp;
stats.lastConsumedTimestamp = lastConsumedTimestamp;
+ stats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp;
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 66d13be10e6..106a40084d1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -207,6 +207,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
"readPositionWhenJoining",
"lastAckedTimestamp",
"lastConsumedTimestamp",
+ "lastConsumedFlowTimestamp",
"keyHashRanges",
"metadata",
"address",
@@ -224,8 +225,10 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
TopicStats stats = admin.topics().getStats(topicName);
ObjectMapper mapper = ObjectMapperFactory.create();
- JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(stats.getSubscriptions()
- .get(subName).getConsumers().get(0)));
+ ConsumerStats consumerStats = stats.getSubscriptions()
+ .get(subName).getConsumers().get(0);
+ Assert.assertTrue(consumerStats.getLastConsumedFlowTimestamp() > 0);
+ JsonNode node = mapper.readTree(mapper.writer().writeValueAsString(consumerStats));
Iterator<String> itr = node.fieldNames();
while (itr.hasNext()) {
String field = itr.next();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index b796ae504b9..5cccd437be4 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -83,6 +83,7 @@ public interface ConsumerStats {
long getLastAckedTimestamp();
long getLastConsumedTimestamp();
+ long getLastConsumedFlowTimestamp();
/** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
List<String> getKeyHashRanges();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index cfae8c9464a..f84e10eaac2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -98,6 +98,8 @@ public class ConsumerStatsImpl implements ConsumerStats {
public long lastAckedTimestamp;
public long lastConsumedTimestamp;
+ public long lastConsumedFlowTimestamp;
+
/** Hash ranges assigned to this consumer if is Key_Shared sub mode. **/
public List<String> keyHashRanges;