You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:37 UTC
[pulsar] 03/04: [broker][monitoring] add message ack rate metric for consumer (#15674)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9796eb40bc68b0aefde722fd1f4f698812fca7a2
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Jun 13 09:56:50 2022 +0800
[broker][monitoring] add message ack rate metric for consumer (#15674)
(cherry picked from commit 88b47e5e5ac1b7fcf895bd72b0545b24bdf61f7e)
---
.../org/apache/pulsar/broker/service/Consumer.java | 24 +++++--
.../apache/pulsar/broker/service/ServerCnx.java | 1 +
.../pulsar/broker/service/StreamingStats.java | 1 +
.../nonpersistent/NonPersistentSubscription.java | 1 +
.../service/nonpersistent/NonPersistentTopic.java | 4 ++
.../service/persistent/PersistentSubscription.java | 1 +
.../broker/service/persistent/PersistentTopic.java | 3 +
.../stats/prometheus/AggregatedConsumerStats.java | 2 +
.../stats/prometheus/AggregatedNamespaceStats.java | 2 +
.../prometheus/AggregatedSubscriptionStats.java | 2 +
.../stats/prometheus/NamespaceStatsAggregator.java | 3 +
.../pulsar/broker/stats/prometheus/TopicStats.java | 7 ++
.../pulsar/broker/stats/ConsumerStatsTest.java | 81 ++++++++++++++++++++++
.../pulsar/common/policies/data/ConsumerStats.java | 5 ++
.../common/policies/data/SubscriptionStats.java | 5 ++
.../policies/data/stats/ConsumerStatsImpl.java | 6 ++
.../policies/data/stats/SubscriptionStatsImpl.java | 5 ++
pulsar-common/src/main/proto/PulsarApi.proto | 3 +
18 files changed, 151 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3f50ab7a7aa..349fbd860e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -86,6 +86,7 @@ public class Consumer {
private final Rate msgRedeliver;
private final LongAdder msgOutCounter;
private final LongAdder bytesOutCounter;
+ private final Rate messageAckRate;
private long lastConsumedTimestamp;
private long lastAckedTimestamp;
@@ -162,6 +163,7 @@ public class Consumer {
this.msgRedeliver = new Rate();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
+ this.messageAckRate = new Rate();
this.appId = appId;
// Ensure we start from compacted view
@@ -366,6 +368,8 @@ public class Consumer {
}
public CompletableFuture<Void> messageAcked(CommandAck ack) {
+ CompletableFuture<Void> future;
+
this.lastAckedTimestamp = System.currentTimeMillis();
Map<String, Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
@@ -399,20 +403,27 @@ public class Consumer {
}
if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
List<PositionImpl> positionsAcked = Collections.singletonList(position);
- return transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
+ future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked);
} else {
List<Position> positionsAcked = Collections.singletonList(position);
subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
- return CompletableFuture.completedFuture(null);
+ future = CompletableFuture.completedFuture(null);
}
} else {
if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
- return individualAckWithTransaction(ack);
+ future = individualAckWithTransaction(ack);
} else {
- return individualAckNormal(ack, properties);
+ future = individualAckNormal(ack, properties);
}
}
+
+ return future
+ .whenComplete((__, t) -> {
+ if (t == null) {
+ this.messageAckRate.recordEvent(ack.getMessageIdsCount());
+ }
+ });
}
//this method is for individual ack not carry the transaction
@@ -742,7 +753,10 @@ public class Consumer {
msgOut.calculateRate();
chunkedMessageRate.calculateRate();
msgRedeliver.calculateRate();
+ messageAckRate.calculateRate();
+
stats.msgRateOut = msgOut.getRate();
+ stats.messageAckRate = messageAckRate.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
stats.chunkedMessageRate = chunkedMessageRate.getRate();
@@ -755,7 +769,7 @@ public class Consumer {
lastAckedTimestamp = consumerStats.lastAckedTimestamp;
lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
subscription, consumerStats.availablePermits, consumerId);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c4f9e590caa..f127acb1fa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -612,6 +612,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
.setConnectedSince(consumerStats.getConnectedSince())
.setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false))
.setMsgRateExpired(subscription.getExpiredMessageRate())
+ .setMessageAckRate(consumerStats.messageAckRate)
.setType(subscription.getTypeString());
return Commands.serializeWithSize(cmd);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
index 02dcb8233ff..469c802b76a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
@@ -65,6 +65,7 @@ public class StreamingStats {
statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);
+ statsStream.writePair("messageAckRate", stats.messageAckRate);
if (Subscription.isIndividualAckMode(subType)) {
statsStream.writePair("unackedMessages", stats.unackedMessages);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a9777f5dd0d..fed3b0f851a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -456,6 +456,7 @@ public class NonPersistentSubscription implements Subscription {
ConsumerStatsImpl consumerStats = consumer.getStats();
subStats.consumers.add(consumerStats);
subStats.msgRateOut += consumerStats.msgRateOut;
+ subStats.messageAckRate += consumerStats.messageAckRate;
subStats.msgThroughputOut += consumerStats.msgThroughputOut;
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 82d610c12b7..fb6da69eb2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -724,6 +724,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
+ double subMsgAckRate = 0;
// Start subscription name & consumers
try {
@@ -738,6 +739,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
ConsumerStatsImpl consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
+ subMsgAckRate += consumerStats.messageAckRate;
+
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
@@ -752,6 +755,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+ topicStatsStream.writePair("messageAckRate", subMsgAckRate);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("type", subscription.getTypeString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 95dee2a3798..31ba5115ef6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -966,6 +966,7 @@ public class PersistentSubscription implements Subscription {
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
+ subStats.messageAckRate += consumerStats.messageAckRate;
subStats.chunkedMessageRate += consumerStats.chunkedMessageRate;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b1ccf0b23ec..629158c4d50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1722,6 +1722,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
+ double subMsgAckRate = 0;
// Start subscription name & consumers
try {
@@ -1735,6 +1736,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
ConsumerStatsImpl consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
+ subMsgAckRate += consumerStats.messageAckRate;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
@@ -1749,6 +1751,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
subscription.getNumberOfEntriesInBacklog(true));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+ topicStatsStream.writePair("messageAckRate", subMsgAckRate);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 8b6bf7d5c96..0a4bd317df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -28,6 +28,8 @@ public class AggregatedConsumerStats {
public double msgRateOut;
+ public double msgAckRate;
+
public double msgThroughputOut;
public long availablePermits;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 8bacd0f582d..5610dbab218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -33,6 +33,7 @@ public class AggregatedNamespaceStats {
public double throughputIn;
public double throughputOut;
+ public long messageAckRate;
public long bytesInCounter;
public long msgInCounter;
public long bytesOutCounter;
@@ -122,6 +123,7 @@ public class AggregatedNamespaceStats {
consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
consumerStats.msgRateRedeliver += v.msgRateRedeliver;
consumerStats.unackedMessages += v.unackedMessages;
+ messageAckRate += v.msgAckRate;
});
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index c829be28e59..2a3d4ed533a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -36,6 +36,8 @@ public class AggregatedSubscriptionStats {
public double msgRateOut;
+ public double messageAckRate;
+
public double msgThroughputOut;
public long msgDelayed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a3a0fcda445..229739b2fce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -131,6 +131,7 @@ public class NamespaceStatsAggregator {
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
+ subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
@@ -240,6 +241,7 @@ public class NamespaceStatsAggregator {
consumerStats.unackedMessages = conStats.unackedMessages;
consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
consumerStats.msgRateOut = conStats.msgRateOut;
+ consumerStats.msgAckRate = conStats.messageAckRate;
consumerStats.msgThroughputOut = conStats.msgThroughputOut;
consumerStats.bytesOutCounter = conStats.bytesOutCounter;
consumerStats.msgOutCounter = conStats.msgOutCounter;
@@ -324,6 +326,7 @@ public class NamespaceStatsAggregator {
metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+ metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 35dbdba274a..46be437eb23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -246,6 +246,8 @@ class TopicStats {
subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+ metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
+ subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
@@ -282,6 +284,11 @@ class TopicStats {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
splitTopicAndPartitionIndexLabel);
+
+ metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+ "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+ splitTopicAndPartitionIndexLabel);
+
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
splitTopicAndPartitionIndexLabel);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 2f702ab89f6..e064e56e993 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -20,14 +20,22 @@ package org.apache.pulsar.broker.stats;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -38,9 +46,13 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Slf4j
@@ -183,6 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
"msgThroughputOut",
"bytesOutCounter",
"msgOutCounter",
+ "messageAckRate",
"msgRateRedeliver",
"chunkedMessageRate",
"consumerName",
@@ -220,4 +233,72 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
consumer.close();
}
+
+
+ @Test
+ public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
+ String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+ testMessageAckRateMetric(topicName, true);
+ }
+
+ @Test
+ public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
+ String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+ testMessageAckRateMetric(topicName, false);
+ }
+
+ private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
+ throws Exception {
+ final int messages = 100;
+ String subName = "test_sub";
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+ .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+
+ String namespace = TopicName.get(topicName).getNamespace();
+
+ for (int i = 0; i < messages; i++) {
+ producer.send(UUID.randomUUID().toString());
+ }
+
+ for (int i = 0; i < messages; i++) {
+ Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
+ if (message == null) {
+ break;
+ }
+
+ consumer.acknowledge(message);
+ }
+
+ Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
+ Subscription subscription = topic.getSubscription(subName);
+ List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
+ Assert.assertEquals(consumers.size(), 1);
+ org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+ consumer1.updateRates();
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
+ String metricStr = output.toString(StandardCharsets.UTF_8);
+
+ Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
+ Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
+ Assert.assertTrue(metrics.size() > 0);
+
+ int num = 0;
+ for (PrometheusMetricsTest.Metric metric : metrics) {
+ if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
+ num++;
+ Assert.assertTrue(metric.value > 0);
+ } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
+ num++;
+ Assert.assertTrue(metric.value > 0);
+ }
+ }
+
+ Assert.assertTrue(num > 0);
+ }
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index c78c17c8d9d..4c165083ab1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -40,6 +40,11 @@ public interface ConsumerStats {
/** Total rate of messages redelivered by this consumer (msg/s). */
double getMsgRateRedeliver();
+ /**
+ * Total rate of message ack(msg/s).
+ */
+ double getMessageAckRate();
+
/** The total rate of chunked messages delivered to this consumer. */
double getChunkedMessageRate();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4175afb7140..39738d2eaa0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -40,6 +40,11 @@ public interface SubscriptionStats {
/** Total rate of messages redelivered on this subscription (msg/s). */
double getMsgRateRedeliver();
+ /**
+ * Total rate of message ack(msg/s).
+ */
+ double getMessageAckRate();
+
/** Chunked message dispatch rate. */
int getChunkedMessageRate();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index 33d0a8d1344..ae9ba67148f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -45,6 +45,11 @@ public class ConsumerStatsImpl implements ConsumerStats {
/** Total rate of messages redelivered by this consumer (msg/s). */
public double msgRateRedeliver;
+ /**
+ * Total rate of message ack(msg/s).
+ */
+ public double messageAckRate;
+
/** The total rate of chunked messages delivered to this consumer. */
public double chunkedMessageRate;
@@ -103,6 +108,7 @@ public class ConsumerStatsImpl implements ConsumerStats {
public ConsumerStatsImpl add(ConsumerStatsImpl stats) {
Objects.requireNonNull(stats);
this.msgRateOut += stats.msgRateOut;
+ this.messageAckRate += stats.messageAckRate;
this.msgThroughputOut += stats.msgThroughputOut;
this.bytesOutCounter += stats.bytesOutCounter;
this.msgOutCounter += stats.msgOutCounter;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 8beaa6facfd..53112295aa2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -47,6 +47,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
/** Total rate of messages redelivered on this subscription (msg/s). */
public double msgRateRedeliver;
+ /**
+ * Total rate of message ack(msg/s).
+ */
+ public double messageAckRate;
+
/** Chunked message dispatch rate. */
public int chunkedMessageRate;
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 337c6bb2cd4..e66c2b4a70e 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -716,6 +716,9 @@ message CommandConsumerStatsResponse {
/// Number of messages in the subscription backlog
optional uint64 msgBacklog = 15;
+
+ /// Total rate of messages ack. msg/s
+ optional double messageAckRate = 16;
}
message CommandGetLastMessageId {