You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:37 UTC

[pulsar] 03/04: [broker][monitoring] add message ack rate metric for consumer (#15674)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9796eb40bc68b0aefde722fd1f4f698812fca7a2
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Jun 13 09:56:50 2022 +0800

    [broker][monitoring] add message ack rate metric for consumer (#15674)
    
    (cherry picked from commit 88b47e5e5ac1b7fcf895bd72b0545b24bdf61f7e)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 24 +++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  1 +
 .../pulsar/broker/service/StreamingStats.java      |  1 +
 .../nonpersistent/NonPersistentSubscription.java   |  1 +
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/service/persistent/PersistentTopic.java |  3 +
 .../stats/prometheus/AggregatedConsumerStats.java  |  2 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  2 +
 .../prometheus/AggregatedSubscriptionStats.java    |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  3 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  7 ++
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |  5 ++
 .../common/policies/data/SubscriptionStats.java    |  5 ++
 .../policies/data/stats/ConsumerStatsImpl.java     |  6 ++
 .../policies/data/stats/SubscriptionStatsImpl.java |  5 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 +
 18 files changed, 151 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3f50ab7a7aa..349fbd860e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -86,6 +86,7 @@ public class Consumer {
     private final Rate msgRedeliver;
     private final LongAdder msgOutCounter;
     private final LongAdder bytesOutCounter;
+    private final Rate messageAckRate;
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
@@ -162,6 +163,7 @@ public class Consumer {
         this.msgRedeliver = new Rate();
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
+        this.messageAckRate = new Rate();
         this.appId = appId;
 
         // Ensure we start from compacted view
@@ -366,6 +368,8 @@ public class Consumer {
     }
 
     public CompletableFuture<Void> messageAcked(CommandAck ack) {
+        CompletableFuture<Void> future;
+
         this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String, Long> properties = Collections.emptyMap();
         if (ack.getPropertiesCount() > 0) {
@@ -399,20 +403,27 @@ public class Consumer {
             }
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                 List<PositionImpl> positionsAcked = Collections.singletonList(position);
-                return transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
+                future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
                         ack.getTxnidLeastBits(), positionsAcked);
             } else {
                 List<Position> positionsAcked = Collections.singletonList(position);
                 subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
-                return CompletableFuture.completedFuture(null);
+                future = CompletableFuture.completedFuture(null);
             }
         } else {
             if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
-                return individualAckWithTransaction(ack);
+                future = individualAckWithTransaction(ack);
             } else {
-                return individualAckNormal(ack, properties);
+                future = individualAckNormal(ack, properties);
             }
         }
+
+        return future
+                .whenComplete((__, t) -> {
+                    if (t == null) {
+                        this.messageAckRate.recordEvent(ack.getMessageIdsCount());
+                    }
+                });
     }
 
     //this method is for individual ack not carry the transaction
@@ -742,7 +753,10 @@ public class Consumer {
         msgOut.calculateRate();
         chunkedMessageRate.calculateRate();
         msgRedeliver.calculateRate();
+        messageAckRate.calculateRate();
+
         stats.msgRateOut = msgOut.getRate();
+        stats.messageAckRate = messageAckRate.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateRedeliver = msgRedeliver.getRate();
         stats.chunkedMessageRate = chunkedMessageRate.getRate();
@@ -755,7 +769,7 @@ public class Consumer {
         lastAckedTimestamp = consumerStats.lastAckedTimestamp;
         lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
         MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
-        if (log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
                     subscription, consumerStats.availablePermits, consumerId);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c4f9e590caa..f127acb1fa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -612,6 +612,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 .setConnectedSince(consumerStats.getConnectedSince())
                 .setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false))
                 .setMsgRateExpired(subscription.getExpiredMessageRate())
+                .setMessageAckRate(consumerStats.messageAckRate)
                 .setType(subscription.getTypeString());
 
         return Commands.serializeWithSize(cmd);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
index 02dcb8233ff..469c802b76a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
@@ -65,6 +65,7 @@ public class StreamingStats {
         statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
         statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
         statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);
+        statsStream.writePair("messageAckRate", stats.messageAckRate);
 
         if (Subscription.isIndividualAckMode(subType)) {
             statsStream.writePair("unackedMessages", stats.unackedMessages);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a9777f5dd0d..fed3b0f851a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -456,6 +456,7 @@ public class NonPersistentSubscription implements Subscription {
                 ConsumerStatsImpl consumerStats = consumer.getStats();
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
+                subStats.messageAckRate += consumerStats.messageAckRate;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 82d610c12b7..fb6da69eb2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -724,6 +724,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
             double subMsgRateOut = 0;
             double subMsgThroughputOut = 0;
             double subMsgRateRedeliver = 0;
+            double subMsgAckRate = 0;
 
             // Start subscription name & consumers
             try {
@@ -738,6 +739,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
 
                     ConsumerStatsImpl consumerStats = consumer.getStats();
                     subMsgRateOut += consumerStats.msgRateOut;
+                    subMsgAckRate += consumerStats.messageAckRate;
+
                     subMsgThroughputOut += consumerStats.msgThroughputOut;
                     subMsgRateRedeliver += consumerStats.msgRateRedeliver;
 
@@ -752,6 +755,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                 topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
                 topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 95dee2a3798..31ba5115ef6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -966,6 +966,7 @@ public class PersistentSubscription implements Subscription {
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
+                subStats.messageAckRate += consumerStats.messageAckRate;
                 subStats.chunkedMessageRate += consumerStats.chunkedMessageRate;
                 subStats.unackedMessages += consumerStats.unackedMessages;
                 subStats.lastConsumedTimestamp =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b1ccf0b23ec..629158c4d50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1722,6 +1722,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             double subMsgRateOut = 0;
             double subMsgThroughputOut = 0;
             double subMsgRateRedeliver = 0;
+            double subMsgAckRate = 0;
 
             // Start subscription name & consumers
             try {
@@ -1735,6 +1736,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                     ConsumerStatsImpl consumerStats = consumer.getStats();
                     subMsgRateOut += consumerStats.msgRateOut;
+                    subMsgAckRate += consumerStats.messageAckRate;
                     subMsgThroughputOut += consumerStats.msgThroughputOut;
                     subMsgRateRedeliver += consumerStats.msgRateRedeliver;
 
@@ -1749,6 +1751,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         subscription.getNumberOfEntriesInBacklog(true));
                 topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 8b6bf7d5c96..0a4bd317df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -28,6 +28,8 @@ public class AggregatedConsumerStats {
 
     public double msgRateOut;
 
+    public double msgAckRate;
+
     public double msgThroughputOut;
 
     public long availablePermits;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 8bacd0f582d..5610dbab218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -33,6 +33,7 @@ public class AggregatedNamespaceStats {
     public double throughputIn;
     public double throughputOut;
 
+    public long messageAckRate;
     public long bytesInCounter;
     public long msgInCounter;
     public long bytesOutCounter;
@@ -122,6 +123,7 @@ public class AggregatedNamespaceStats {
                 consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
                 consumerStats.msgRateRedeliver += v.msgRateRedeliver;
                 consumerStats.unackedMessages += v.unackedMessages;
+                messageAckRate += v.msgAckRate;
             });
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index c829be28e59..2a3d4ed533a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -36,6 +36,8 @@ public class AggregatedSubscriptionStats {
 
     public double msgRateOut;
 
+    public double messageAckRate;
+
     public double msgThroughputOut;
 
     public long msgDelayed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a3a0fcda445..229739b2fce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -131,6 +131,7 @@ public class NamespaceStatsAggregator {
             subsStats.unackedMessages += cStats.unackedMessages;
             subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
             subsStats.msgRateOut += cStats.msgRateOut;
+            subsStats.messageAckRate += cStats.messageAckRate;
             subsStats.msgThroughputOut += cStats.msgThroughputOut;
             subsStats.bytesOutCounter += cStats.bytesOutCounter;
             subsStats.msgOutCounter += cStats.msgOutCounter;
@@ -240,6 +241,7 @@ public class NamespaceStatsAggregator {
                     consumerStats.unackedMessages = conStats.unackedMessages;
                     consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
                     consumerStats.msgRateOut = conStats.msgRateOut;
+                    consumerStats.msgAckRate = conStats.messageAckRate;
                     consumerStats.msgThroughputOut = conStats.msgThroughputOut;
                     consumerStats.bytesOutCounter = conStats.bytesOutCounter;
                     consumerStats.msgOutCounter = conStats.msgOutCounter;
@@ -324,6 +326,7 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
         metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
         metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+        metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
 
         metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
         metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 35dbdba274a..46be437eb23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -246,6 +246,8 @@ class TopicStats {
                     subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
                     subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
+                    subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
                     subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
@@ -282,6 +284,11 @@ class TopicStats {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
                         splitTopicAndPartitionIndexLabel);
+
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+                        splitTopicAndPartitionIndexLabel);
+
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
                         splitTopicAndPartitionIndexLabel);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 2f702ab89f6..e064e56e993 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -20,14 +20,22 @@ package org.apache.pulsar.broker.stats;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -38,9 +46,13 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -183,6 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
                 "msgThroughputOut",
                 "bytesOutCounter",
                 "msgOutCounter",
+                "messageAckRate",
                 "msgRateRedeliver",
                 "chunkedMessageRate",
                 "consumerName",
@@ -220,4 +233,72 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
         consumer.close();
     }
+
+
+    @Test
+    public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
+        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+        testMessageAckRateMetric(topicName, true);
+    }
+
+    @Test
+    public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
+        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+        testMessageAckRateMetric(topicName, false);
+    }
+
+    private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
+            throws Exception {
+        final int messages = 100;
+        String subName = "test_sub";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+
+        String namespace = TopicName.get(topicName).getNamespace();
+
+        for (int i = 0; i < messages; i++) {
+            producer.send(UUID.randomUUID().toString());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+
+            consumer.acknowledge(message);
+        }
+
+        Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        Subscription subscription = topic.getSubscription(subName);
+        List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
+        Assert.assertEquals(consumers.size(), 1);
+        org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+        consumer1.updateRates();
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
+        String metricStr = output.toString(StandardCharsets.UTF_8);
+
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
+        Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
+        Assert.assertTrue(metrics.size() > 0);
+
+        int num = 0;
+        for (PrometheusMetricsTest.Metric metric : metrics) {
+            if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
+                num++;
+                Assert.assertTrue(metric.value > 0);
+            } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
+                num++;
+                Assert.assertTrue(metric.value > 0);
+            }
+        }
+
+        Assert.assertTrue(num > 0);
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index c78c17c8d9d..4c165083ab1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -40,6 +40,11 @@ public interface ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     double getMsgRateRedeliver();
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    double getMessageAckRate();
+
     /** The total rate of chunked messages delivered to this consumer. */
     double getChunkedMessageRate();
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4175afb7140..39738d2eaa0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -40,6 +40,11 @@ public interface SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     double getMsgRateRedeliver();
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    double getMessageAckRate();
+
     /** Chunked message dispatch rate. */
     int getChunkedMessageRate();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index 33d0a8d1344..ae9ba67148f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -45,6 +45,11 @@ public class ConsumerStatsImpl implements ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     public double msgRateRedeliver;
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    public double messageAckRate;
+
     /** The total rate of chunked messages delivered to this consumer. */
     public double chunkedMessageRate;
 
@@ -103,6 +108,7 @@ public class ConsumerStatsImpl implements ConsumerStats {
     public ConsumerStatsImpl add(ConsumerStatsImpl stats) {
         Objects.requireNonNull(stats);
         this.msgRateOut += stats.msgRateOut;
+        this.messageAckRate += stats.messageAckRate;
         this.msgThroughputOut += stats.msgThroughputOut;
         this.bytesOutCounter += stats.bytesOutCounter;
         this.msgOutCounter += stats.msgOutCounter;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 8beaa6facfd..53112295aa2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -47,6 +47,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     public double msgRateRedeliver;
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    public double messageAckRate;
+
     /** Chunked message dispatch rate. */
     public int chunkedMessageRate;
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 337c6bb2cd4..e66c2b4a70e 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -716,6 +716,9 @@ message CommandConsumerStatsResponse {
 
         /// Number of messages in the subscription backlog
         optional uint64 msgBacklog                  = 15;
+
+        /// Total rate of messages ack. msg/s
+        optional double messageAckRate              = 16;
 }
 
 message CommandGetLastMessageId {