You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/13 02:42:34 UTC

[pulsar] branch branch-2.10 updated (7c88bd1b20b -> ff7762b21be)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 7c88bd1b20b [fix][security] Add timeout of sync methods and avoid call sync method for AuthoriationService (#15694)
     new 49087fa4fb7 [Function] provide default error handler for function log appender (#15728)
     new 99cc4a96475 non-persistent topic metrics (#13827)
     new 9796eb40bc6 [broker][monitoring] add message ack rate metric for consumer (#15674)
     new ff7762b21be Fix cherry-pick issue of using JDK10 API

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/service/Consumer.java | 24 +++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  1 +
 .../pulsar/broker/service/StreamingStats.java      |  1 +
 .../nonpersistent/NonPersistentSubscription.java   |  1 +
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/service/persistent/PersistentTopic.java |  3 +
 .../stats/prometheus/AggregatedConsumerStats.java  |  2 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  2 +
 .../prometheus/AggregatedSubscriptionStats.java    |  4 ++
 .../stats/prometheus/NamespaceStatsAggregator.java | 84 ++++++++++++++--------
 .../pulsar/broker/stats/prometheus/TopicStats.java |  9 +++
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 81 +++++++++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 40 ++++++++++-
 .../pulsar/common/policies/data/ConsumerStats.java |  5 ++
 .../common/policies/data/SubscriptionStats.java    |  5 ++
 .../policies/data/stats/ConsumerStatsImpl.java     |  6 ++
 .../policies/data/stats/SubscriptionStatsImpl.java |  5 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 +
 .../pulsar/functions/instance/LogAppender.java     | 19 ++++-
 20 files changed, 260 insertions(+), 40 deletions(-)


[pulsar] 01/04: [Function] provide default error handler for function log appender (#15728)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 49087fa4fb763339a40cffe592fbcc774765b1ba
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Sun Jun 12 18:05:25 2022 -0700

    [Function] provide default error handler for function log appender (#15728)
    
    (cherry picked from commit f7635ec6d99bd5a13a31c7e9f17640746afec43c)
---
 .../apache/pulsar/functions/instance/LogAppender.java | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
index 5250e4c3cef..956717975e5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/LogAppender.java
@@ -23,6 +23,7 @@ import org.apache.logging.log4j.core.Appender;
 import org.apache.logging.log4j.core.ErrorHandler;
 import org.apache.logging.log4j.core.Layout;
 import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.DefaultErrorHandler;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -35,6 +36,11 @@ import java.util.concurrent.TimeUnit;
  * to a log topic.
  */
 public class LogAppender implements Appender {
+
+    private static final String LOG_LEVEL = "loglevel";
+    private static final String INSTANCE = "instance";
+    private static final String FQN = "fqn";
+
     private PulsarClient pulsarClient;
     private String logTopic;
     private String fqn;
@@ -48,15 +54,16 @@ public class LogAppender implements Appender {
         this.logTopic = logTopic;
         this.fqn = fqn;
         this.instance = instance;
+        this.errorHandler = new DefaultErrorHandler(this);
     }
 
     @Override
     public void append(LogEvent logEvent) {
         producer.newMessage()
                 .value(logEvent.getMessage().getFormattedMessage().getBytes(StandardCharsets.UTF_8))
-                .property("loglevel", logEvent.getLevel().name())
-                .property("instance", instance)
-                .property("fqn", fqn)
+                .property(LOG_LEVEL, logEvent.getLevel().name())
+                .property(INSTANCE, instance)
+                .property(FQN, fqn)
                 .sendAsync();
     }
 
@@ -82,6 +89,12 @@ public class LogAppender implements Appender {
 
     @Override
     public void setHandler(ErrorHandler errorHandler) {
+        if (errorHandler == null) {
+            throw new RuntimeException("The log error handler cannot be set to null");
+        }
+        if (isStarted()) {
+            throw new RuntimeException("The log error handler cannot be changed once the appender is started");
+        }
         this.errorHandler = errorHandler;
     }
 


[pulsar] 04/04: Fix cherry-pick issue of using JDK10 API

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ff7762b21bed2a75a49fa6710176f97a8a3a707b
Author: penghui <pe...@apache.org>
AuthorDate: Mon Jun 13 10:42:21 2022 +0800

    Fix cherry-pick issue of using JDK10 API
---
 .../src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index e064e56e993..412efe69632 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -282,7 +282,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
         ByteArrayOutputStream output = new ByteArrayOutputStream();
         PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
-        String metricStr = output.toString(StandardCharsets.UTF_8);
+        String metricStr = output.toString(StandardCharsets.UTF_8.name());
 
         Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
         Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");


[pulsar] 02/04: non-persistent topic metrics (#13827)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 99cc4a96475552ce14be8338005a45bedda38b55
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Tue Feb 22 09:56:30 2022 +0800

    non-persistent topic metrics (#13827)
    
    ### Motivation
    Non-persistent topic doesn't have subscription metrics
    
    ### Modifications
    
    Expose a new non-persistent subscription metric: `pulsar_subscription_msg_drop_rate`
    
    (cherry picked from commit d548bc4ef3b785325b43c737f347ac702a840602)
---
 .../prometheus/AggregatedSubscriptionStats.java    |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java | 81 +++++++++++++---------
 .../pulsar/broker/stats/prometheus/TopicStats.java |  2 +
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 40 ++++++++++-
 4 files changed, 93 insertions(+), 32 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index fb74daf419f..c829be28e59 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -58,5 +58,7 @@ public class AggregatedSubscriptionStats {
 
     long totalMsgExpired;
 
+    double msgDropRate;
+
     public Map<Consumer, AggregatedConsumerStats> consumerStat = new HashMap<>();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index 7e36c5612d0..a3a0fcda445 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -31,7 +31,10 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
+import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.apache.pulsar.compaction.CompactedTopicContext;
@@ -109,6 +112,37 @@ public class NamespaceStatsAggregator {
         return Optional.ofNullable(compactor).map(c -> c.getStats());
     }
 
+    private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subscriptionStats,
+                                            AggregatedSubscriptionStats subsStats) {
+        stats.subscriptionsCount++;
+        stats.msgBacklog += subscriptionStats.msgBacklog;
+        subsStats.msgBacklog = subscriptionStats.msgBacklog;
+        subsStats.msgDelayed = subscriptionStats.msgDelayed;
+        subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
+        subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
+        subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
+        subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
+        subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
+        subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
+        subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
+        subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
+        subscriptionStats.consumers.forEach(cStats -> {
+            stats.consumersCount++;
+            subsStats.unackedMessages += cStats.unackedMessages;
+            subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
+            subsStats.msgRateOut += cStats.msgRateOut;
+            subsStats.msgThroughputOut += cStats.msgThroughputOut;
+            subsStats.bytesOutCounter += cStats.bytesOutCounter;
+            subsStats.msgOutCounter += cStats.msgOutCounter;
+            if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
+                subsStats.blockedSubscriptionOnUnackedMsgs = true;
+            }
+        });
+        stats.rateOut += subsStats.msgRateOut;
+        stats.throughputOut += subsStats.msgThroughputOut;
+
+    }
+
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
             boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
                                       Optional<CompactorMXBean> compactorMXBean) {
@@ -141,7 +175,6 @@ public class NamespaceStatsAggregator {
             stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
             stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
         }
-
         TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
         stats.msgInCounter = tStatus.msgInCounter;
         stats.bytesInCounter = tStatus.bytesInCounter;
@@ -175,37 +208,23 @@ public class NamespaceStatsAggregator {
             }
         });
 
-        tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
-            stats.subscriptionsCount++;
-            stats.msgBacklog += subscriptionStats.msgBacklog;
-
-            AggregatedSubscriptionStats subsStats = stats.subscriptionStats
-                    .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
-            subsStats.msgBacklog = subscriptionStats.msgBacklog;
-            subsStats.msgDelayed = subscriptionStats.msgDelayed;
-            subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
-            subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
-            subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
-            subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
-            subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
-            subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
-            subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
-            subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
-            subscriptionStats.consumers.forEach(cStats -> {
-                stats.consumersCount++;
-                subsStats.unackedMessages += cStats.unackedMessages;
-                subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
-                subsStats.msgRateOut += cStats.msgRateOut;
-                subsStats.msgThroughputOut += cStats.msgThroughputOut;
-                subsStats.bytesOutCounter += cStats.bytesOutCounter;
-                subsStats.msgOutCounter += cStats.msgOutCounter;
-                if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
-                    subsStats.blockedSubscriptionOnUnackedMsgs = true;
-                }
+        if (topic instanceof PersistentTopic) {
+            tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
+                AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                        .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+                aggregateTopicStats(stats, subscriptionStats, subsStats);
             });
-            stats.rateOut += subsStats.msgRateOut;
-            stats.throughputOut += subsStats.msgThroughputOut;
-        });
+        } else {
+            ((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions()
+                    .forEach((subName, nonPersistentSubscriptionStats) -> {
+                NonPersistentSubscriptionStatsImpl subscriptionStats =
+                                (NonPersistentSubscriptionStatsImpl) nonPersistentSubscriptionStats;
+                AggregatedSubscriptionStats subsStats = stats.subscriptionStats
+                        .computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
+                aggregateTopicStats(stats, subscriptionStats, subsStats);
+                subsStats.msgDropRate += subscriptionStats.getMsgDropRate();
+            });
+        }
 
         // Consumer stats can be a lot if a subscription has many consumers
         if (includeConsumerMetrics) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index f0556e8da90..35dbdba274a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -266,6 +266,8 @@ class TopicStats {
                     subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
                     subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
+                    subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
             subsStats.consumerStat.forEach((c, consumerStats) -> {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 3f64619a983..c4823c8b0e7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -455,7 +455,6 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                 .subscribe();
 
         final int messages = 10;
-
         for (int i = 0; i < messages; i++) {
             String message = "my-message-" + i;
             p1.send(message.getBytes());
@@ -491,6 +490,45 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertTrue(metrics.containsKey("pulsar_lb_bundles_split_count"));
     }
 
+    @Test
+    public void testNonPersistentSubMetrics() throws Exception {
+        Producer<byte[]> p1 =
+                pulsarClient.newProducer().topic("non-persistent://my-property/use/my-ns/my-topic1").create();
+
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+                .topic("non-persistent://my-property/use/my-ns/my-topic1")
+                .subscriptionName("test")
+                .subscribe();
+
+        final int messages = 100;
+
+        for (int i = 0; i < messages; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            c1.acknowledge(c1.receive());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        assertTrue(metrics.containsKey("pulsar_subscription_back_log"));
+        assertTrue(metrics.containsKey("pulsar_subscription_back_log_no_delayed"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_throughput_out"));
+        assertTrue(metrics.containsKey("pulsar_throughput_out"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_redeliver"));
+        assertTrue(metrics.containsKey("pulsar_subscription_unacked_messages"));
+        assertTrue(metrics.containsKey("pulsar_subscription_blocked_on_unacked_messages"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_rate_out"));
+        assertTrue(metrics.containsKey("pulsar_out_bytes_total"));
+        assertTrue(metrics.containsKey("pulsar_out_messages_total"));
+        assertTrue(metrics.containsKey("pulsar_subscription_last_expire_timestamp"));
+        assertTrue(metrics.containsKey("pulsar_subscription_msg_drop_rate"));
+    }
+
     @Test
     public void testPerNamespaceStats() throws Exception {
         Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();


[pulsar] 03/04: [broker][monitoring] add message ack rate metric for consumer (#15674)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9796eb40bc68b0aefde722fd1f4f698812fca7a2
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Mon Jun 13 09:56:50 2022 +0800

    [broker][monitoring] add message ack rate metric for consumer (#15674)
    
    (cherry picked from commit 88b47e5e5ac1b7fcf895bd72b0545b24bdf61f7e)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 24 +++++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  1 +
 .../pulsar/broker/service/StreamingStats.java      |  1 +
 .../nonpersistent/NonPersistentSubscription.java   |  1 +
 .../service/nonpersistent/NonPersistentTopic.java  |  4 ++
 .../service/persistent/PersistentSubscription.java |  1 +
 .../broker/service/persistent/PersistentTopic.java |  3 +
 .../stats/prometheus/AggregatedConsumerStats.java  |  2 +
 .../stats/prometheus/AggregatedNamespaceStats.java |  2 +
 .../prometheus/AggregatedSubscriptionStats.java    |  2 +
 .../stats/prometheus/NamespaceStatsAggregator.java |  3 +
 .../pulsar/broker/stats/prometheus/TopicStats.java |  7 ++
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 81 ++++++++++++++++++++++
 .../pulsar/common/policies/data/ConsumerStats.java |  5 ++
 .../common/policies/data/SubscriptionStats.java    |  5 ++
 .../policies/data/stats/ConsumerStatsImpl.java     |  6 ++
 .../policies/data/stats/SubscriptionStatsImpl.java |  5 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  3 +
 18 files changed, 151 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3f50ab7a7aa..349fbd860e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -86,6 +86,7 @@ public class Consumer {
     private final Rate msgRedeliver;
     private final LongAdder msgOutCounter;
     private final LongAdder bytesOutCounter;
+    private final Rate messageAckRate;
 
     private long lastConsumedTimestamp;
     private long lastAckedTimestamp;
@@ -162,6 +163,7 @@ public class Consumer {
         this.msgRedeliver = new Rate();
         this.bytesOutCounter = new LongAdder();
         this.msgOutCounter = new LongAdder();
+        this.messageAckRate = new Rate();
         this.appId = appId;
 
         // Ensure we start from compacted view
@@ -366,6 +368,8 @@ public class Consumer {
     }
 
     public CompletableFuture<Void> messageAcked(CommandAck ack) {
+        CompletableFuture<Void> future;
+
         this.lastAckedTimestamp = System.currentTimeMillis();
         Map<String, Long> properties = Collections.emptyMap();
         if (ack.getPropertiesCount() > 0) {
@@ -399,20 +403,27 @@ public class Consumer {
             }
             if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) {
                 List<PositionImpl> positionsAcked = Collections.singletonList(position);
-                return transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
+                future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(),
                         ack.getTxnidLeastBits(), positionsAcked);
             } else {
                 List<Position> positionsAcked = Collections.singletonList(position);
                 subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties);
-                return CompletableFuture.completedFuture(null);
+                future = CompletableFuture.completedFuture(null);
             }
         } else {
             if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) {
-                return individualAckWithTransaction(ack);
+                future = individualAckWithTransaction(ack);
             } else {
-                return individualAckNormal(ack, properties);
+                future = individualAckNormal(ack, properties);
             }
         }
+
+        return future
+                .whenComplete((__, t) -> {
+                    if (t == null) {
+                        this.messageAckRate.recordEvent(ack.getMessageIdsCount());
+                    }
+                });
     }
 
     //this method is for individual ack not carry the transaction
@@ -742,7 +753,10 @@ public class Consumer {
         msgOut.calculateRate();
         chunkedMessageRate.calculateRate();
         msgRedeliver.calculateRate();
+        messageAckRate.calculateRate();
+
         stats.msgRateOut = msgOut.getRate();
+        stats.messageAckRate = messageAckRate.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateRedeliver = msgRedeliver.getRate();
         stats.chunkedMessageRate = chunkedMessageRate.getRate();
@@ -755,7 +769,7 @@ public class Consumer {
         lastAckedTimestamp = consumerStats.lastAckedTimestamp;
         lastConsumedTimestamp = consumerStats.lastConsumedTimestamp;
         MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits);
-        if (log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName,
                     subscription, consumerStats.availablePermits, consumerId);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index c4f9e590caa..f127acb1fa7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -612,6 +612,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 .setConnectedSince(consumerStats.getConnectedSince())
                 .setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false))
                 .setMsgRateExpired(subscription.getExpiredMessageRate())
+                .setMessageAckRate(consumerStats.messageAckRate)
                 .setType(subscription.getTypeString());
 
         return Commands.serializeWithSize(cmd);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
index 02dcb8233ff..469c802b76a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StreamingStats.java
@@ -65,6 +65,7 @@ public class StreamingStats {
         statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
         statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
         statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);
+        statsStream.writePair("messageAckRate", stats.messageAckRate);
 
         if (Subscription.isIndividualAckMode(subType)) {
             statsStream.writePair("unackedMessages", stats.unackedMessages);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index a9777f5dd0d..fed3b0f851a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -456,6 +456,7 @@ public class NonPersistentSubscription implements Subscription {
                 ConsumerStatsImpl consumerStats = consumer.getStats();
                 subStats.consumers.add(consumerStats);
                 subStats.msgRateOut += consumerStats.msgRateOut;
+                subStats.messageAckRate += consumerStats.messageAckRate;
                 subStats.msgThroughputOut += consumerStats.msgThroughputOut;
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 82d610c12b7..fb6da69eb2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -724,6 +724,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
             double subMsgRateOut = 0;
             double subMsgThroughputOut = 0;
             double subMsgRateRedeliver = 0;
+            double subMsgAckRate = 0;
 
             // Start subscription name & consumers
             try {
@@ -738,6 +739,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
 
                     ConsumerStatsImpl consumerStats = consumer.getStats();
                     subMsgRateOut += consumerStats.msgRateOut;
+                    subMsgAckRate += consumerStats.messageAckRate;
+
                     subMsgThroughputOut += consumerStats.msgThroughputOut;
                     subMsgRateRedeliver += consumerStats.msgRateRedeliver;
 
@@ -752,6 +755,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
                 topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
                 topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("type", subscription.getTypeString());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 95dee2a3798..31ba5115ef6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -966,6 +966,7 @@ public class PersistentSubscription implements Subscription {
                 subStats.bytesOutCounter += consumerStats.bytesOutCounter;
                 subStats.msgOutCounter += consumerStats.msgOutCounter;
                 subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
+                subStats.messageAckRate += consumerStats.messageAckRate;
                 subStats.chunkedMessageRate += consumerStats.chunkedMessageRate;
                 subStats.unackedMessages += consumerStats.unackedMessages;
                 subStats.lastConsumedTimestamp =
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b1ccf0b23ec..629158c4d50 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1722,6 +1722,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             double subMsgRateOut = 0;
             double subMsgThroughputOut = 0;
             double subMsgRateRedeliver = 0;
+            double subMsgAckRate = 0;
 
             // Start subscription name & consumers
             try {
@@ -1735,6 +1736,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
 
                     ConsumerStatsImpl consumerStats = consumer.getStats();
                     subMsgRateOut += consumerStats.msgRateOut;
+                    subMsgAckRate += consumerStats.messageAckRate;
                     subMsgThroughputOut += consumerStats.msgThroughputOut;
                     subMsgRateRedeliver += consumerStats.msgRateRedeliver;
 
@@ -1749,6 +1751,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         subscription.getNumberOfEntriesInBacklog(true));
                 topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
+                topicStatsStream.writePair("messageAckRate", subMsgAckRate);
                 topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
                 topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
                 topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
index 8b6bf7d5c96..0a4bd317df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedConsumerStats.java
@@ -28,6 +28,8 @@ public class AggregatedConsumerStats {
 
     public double msgRateOut;
 
+    public double msgAckRate;
+
     public double msgThroughputOut;
 
     public long availablePermits;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index 8bacd0f582d..5610dbab218 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -33,6 +33,7 @@ public class AggregatedNamespaceStats {
     public double throughputIn;
     public double throughputOut;
 
+    public long messageAckRate;
     public long bytesInCounter;
     public long msgInCounter;
     public long bytesOutCounter;
@@ -122,6 +123,7 @@ public class AggregatedNamespaceStats {
                 consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
                 consumerStats.msgRateRedeliver += v.msgRateRedeliver;
                 consumerStats.unackedMessages += v.unackedMessages;
+                messageAckRate += v.msgAckRate;
             });
         });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
index c829be28e59..2a3d4ed533a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedSubscriptionStats.java
@@ -36,6 +36,8 @@ public class AggregatedSubscriptionStats {
 
     public double msgRateOut;
 
+    public double messageAckRate;
+
     public double msgThroughputOut;
 
     public long msgDelayed;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index a3a0fcda445..229739b2fce 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -131,6 +131,7 @@ public class NamespaceStatsAggregator {
             subsStats.unackedMessages += cStats.unackedMessages;
             subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
             subsStats.msgRateOut += cStats.msgRateOut;
+            subsStats.messageAckRate += cStats.messageAckRate;
             subsStats.msgThroughputOut += cStats.msgThroughputOut;
             subsStats.bytesOutCounter += cStats.bytesOutCounter;
             subsStats.msgOutCounter += cStats.msgOutCounter;
@@ -240,6 +241,7 @@ public class NamespaceStatsAggregator {
                     consumerStats.unackedMessages = conStats.unackedMessages;
                     consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
                     consumerStats.msgRateOut = conStats.msgRateOut;
+                    consumerStats.msgAckRate = conStats.messageAckRate;
                     consumerStats.msgThroughputOut = conStats.msgThroughputOut;
                     consumerStats.bytesOutCounter = conStats.bytesOutCounter;
                     consumerStats.msgOutCounter = conStats.msgOutCounter;
@@ -324,6 +326,7 @@ public class NamespaceStatsAggregator {
         metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
         metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
         metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
+        metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
 
         metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
         metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 35dbdba274a..46be437eb23 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -246,6 +246,8 @@ class TopicStats {
                     subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
                     subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
+            metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
+                    subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
                     subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
             metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
@@ -282,6 +284,11 @@ class TopicStats {
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
                         splitTopicAndPartitionIndexLabel);
+
+                metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
+                        "pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
+                        splitTopicAndPartitionIndexLabel);
+
                 metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
                         "pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
                         splitTopicAndPartitionIndexLabel);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 2f702ab89f6..e064e56e993 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -20,14 +20,22 @@ package org.apache.pulsar.broker.stats;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -38,9 +46,13 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
@@ -183,6 +195,7 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
                 "msgThroughputOut",
                 "bytesOutCounter",
                 "msgOutCounter",
+                "messageAckRate",
                 "msgRateRedeliver",
                 "chunkedMessageRate",
                 "consumerName",
@@ -220,4 +233,72 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
 
         consumer.close();
     }
+
+
+    @Test
+    public void testPersistentTopicMessageAckRateMetricTopicLevel() throws Exception {
+        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+        testMessageAckRateMetric(topicName, true);
+    }
+
+    @Test
+    public void testPersistentTopicMessageAckRateMetricNamespaceLevel() throws Exception {
+        String topicName = "persistent://public/default/msg_ack_rate" + UUID.randomUUID();
+        testMessageAckRateMetric(topicName, false);
+    }
+
+    private void testMessageAckRateMetric(String topicName, boolean exposeTopicLevelMetrics)
+            throws Exception {
+        final int messages = 100;
+        String subName = "test_sub";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
+                .subscriptionName(subName).isAckReceiptEnabled(true).subscribe();
+
+        String namespace = TopicName.get(topicName).getNamespace();
+
+        for (int i = 0; i < messages; i++) {
+            producer.send(UUID.randomUUID().toString());
+        }
+
+        for (int i = 0; i < messages; i++) {
+            Message<String> message = consumer.receive(20, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+
+            consumer.acknowledge(message);
+        }
+
+        Topic topic = pulsar.getBrokerService().getTopic(topicName, false).get().get();
+        Subscription subscription = topic.getSubscription(subName);
+        List<org.apache.pulsar.broker.service.Consumer> consumers = subscription.getConsumers();
+        Assert.assertEquals(consumers.size(), 1);
+        org.apache.pulsar.broker.service.Consumer consumer1 = consumers.get(0);
+        consumer1.updateRates();
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, exposeTopicLevelMetrics, true, true, output);
+        String metricStr = output.toString(StandardCharsets.UTF_8);
+
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricStr);
+        Collection<PrometheusMetricsTest.Metric> metrics = metricsMap.get("pulsar_consumer_msg_ack_rate");
+        Assert.assertTrue(metrics.size() > 0);
+
+        int num = 0;
+        for (PrometheusMetricsTest.Metric metric : metrics) {
+            if (exposeTopicLevelMetrics && metric.tags.get("subscription").equals(subName)) {
+                num++;
+                Assert.assertTrue(metric.value > 0);
+            } else if (!exposeTopicLevelMetrics && metric.tags.get("namespace").equals(namespace)) {
+                num++;
+                Assert.assertTrue(metric.value > 0);
+            }
+        }
+
+        Assert.assertTrue(num > 0);
+    }
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
index c78c17c8d9d..4c165083ab1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ConsumerStats.java
@@ -40,6 +40,11 @@ public interface ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     double getMsgRateRedeliver();
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    double getMessageAckRate();
+
     /** The total rate of chunked messages delivered to this consumer. */
     double getChunkedMessageRate();
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
index 4175afb7140..39738d2eaa0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/SubscriptionStats.java
@@ -40,6 +40,11 @@ public interface SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     double getMsgRateRedeliver();
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    double getMessageAckRate();
+
     /** Chunked message dispatch rate. */
     int getChunkedMessageRate();
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
index 33d0a8d1344..ae9ba67148f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/ConsumerStatsImpl.java
@@ -45,6 +45,11 @@ public class ConsumerStatsImpl implements ConsumerStats {
     /** Total rate of messages redelivered by this consumer (msg/s). */
     public double msgRateRedeliver;
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    public double messageAckRate;
+
     /** The total rate of chunked messages delivered to this consumer. */
     public double chunkedMessageRate;
 
@@ -103,6 +108,7 @@ public class ConsumerStatsImpl implements ConsumerStats {
     public ConsumerStatsImpl add(ConsumerStatsImpl stats) {
         Objects.requireNonNull(stats);
         this.msgRateOut += stats.msgRateOut;
+        this.messageAckRate += stats.messageAckRate;
         this.msgThroughputOut += stats.msgThroughputOut;
         this.bytesOutCounter += stats.bytesOutCounter;
         this.msgOutCounter += stats.msgOutCounter;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
index 8beaa6facfd..53112295aa2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java
@@ -47,6 +47,11 @@ public class SubscriptionStatsImpl implements SubscriptionStats {
     /** Total rate of messages redelivered on this subscription (msg/s). */
     public double msgRateRedeliver;
 
+    /**
+     * Total rate of message ack(msg/s).
+     */
+    public double messageAckRate;
+
     /** Chunked message dispatch rate. */
     public int chunkedMessageRate;
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 337c6bb2cd4..e66c2b4a70e 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -716,6 +716,9 @@ message CommandConsumerStatsResponse {
 
         /// Number of messages in the subscription backlog
         optional uint64 msgBacklog                  = 15;
+
+        /// Total rate of messages ack. msg/s
+        optional double messageAckRate              = 16;
 }
 
 message CommandGetLastMessageId {