You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 03:05:28 UTC

[pulsar] 03/05: Expose compaction metrics to Prometheus (#11739)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a80f1e8774d1c8f6505f19ed8ec1b97d75fa47d9
Author: GuoJiwei <te...@apache.org>
AuthorDate: Tue Aug 31 22:58:44 2021 +0800

    Expose compaction metrics to Prometheus (#11739)
    
    ### Motivation
    As #11564 has involved compaction metrics in CLI, it's extremely useful to expose relative metrics to Prometheus.
    
    - pulsar_compaction_removed_event_count :  the removed event count of compaction .
    - pulsar_compaction_succeed_count : the succeed count of compaction.
    - pulsar_compaction_failed_count : the failed count of compaction.
    - pulsar_compaction_duration_time_in_millis : the duration time of compaction.
    - pulsar_compaction_read_throughput : the read throughput of compaction.
    - pulsar_compaction_write_throughput : the write throughput of compaction.
    - pulsar_compaction_compacted_entries_count: the compacted entries count.
    - pulsar_compaction_compacted_entries_size: the compacted entries size;
    
    if users enable the topic level metrics and the topic has been compacted or doing the compaction, we should expose the compaction metrics for the topic level. If users disable the topic level metrics, we should expose the aggregated compaction metrics for the namespace level.
    
    
    (cherry picked from commit 656635ef7638c9df6be9513bd38d34f6872b2b29)
---
 .../broker/service/persistent/PersistentTopic.java |  46 ++++----
 .../broker/stats/metrics/AbstractMetrics.java      |  23 +---
 .../stats/prometheus/AggregatedNamespaceStats.java |  22 ++++
 .../stats/prometheus/NamespaceStatsAggregator.java |  53 ++++++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  73 +++++++++++-
 .../pulsar/compaction/CompactedTopicContext.java   |  37 ++++++
 .../pulsar/compaction/CompactedTopicImpl.java      |  12 --
 .../apache/pulsar/compaction/CompactionRecord.java | 130 +++++++++++++++++++++
 .../apache/pulsar/compaction/CompactorMXBean.java  |  26 +----
 .../pulsar/compaction/CompactorMXBeanImpl.java     |  65 ++++-------
 .../pulsar/compaction/TwoPhaseCompactor.java       |  10 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  93 +++++++++++++++
 .../prometheus/AggregatedNamespaceStatsTest.java   |  25 ++++
 .../pulsar/compaction/CompactorMXBeanImplTest.java |  40 +++++--
 .../apache/pulsar/compaction/CompactorTest.java    |   9 +-
 site2/docs/reference-metrics.md                    |  10 ++
 16 files changed, 539 insertions(+), 135 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b017750..a4db0bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -149,6 +149,7 @@ import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
@@ -1902,15 +1903,16 @@ public class PersistentTopic extends AbstractTopic
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
         Optional<CompactorMXBean> mxBean = getCompactorMXBean();
-        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
-                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
-        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
-                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
 
+        stats.compaction.reset();
+        mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
+            stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
+            stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
+            stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
+            stats.compaction.lastCompactionDurationTimeInMills =
+                    compactionRecord.getLastCompactionDurationTimeInMills();
+            return compactionRecord;
+        });
         return stats;
     }
 
@@ -1987,19 +1989,14 @@ public class PersistentTopic extends AbstractTopic
         info.entries = -1;
         info.size = -1;
 
-        try {
-            Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
-                    ((CompactedTopicImpl) compactedTopic)
-                            .getCompactedTopicContext();
-            if (compactedTopicContext.isPresent()) {
-                CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
-                info.ledgerId = ledgerContext.getLedger().getId();
-                info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
-                info.size = ledgerContext.getLedger().getLength();
-            }
-        } catch (ExecutionException | InterruptedException e) {
-            log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
+        Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
+        if (compactedTopicContext.isPresent()) {
+            CompactedTopicContext ledgerContext = compactedTopicContext.get();
+            info.ledgerId = ledgerContext.getLedger().getId();
+            info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
+            info.size = ledgerContext.getLedger().getLength();
         }
+
         stats.compactedLedger = info;
 
         stats.cursors = Maps.newTreeMap();
@@ -2116,6 +2113,15 @@ public class PersistentTopic extends AbstractTopic
         return statFuture;
     }
 
+    public Optional<CompactedTopicContext> getCompactedTopicContext() {
+        try {
+            return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
+        } catch (ExecutionException | InterruptedException e) {
+            log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
+        }
+        return Optional.empty();
+    }
+
     public long getBacklogSize() {
         return ledger.getEstimatedBacklogSize();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index ed6e79f..610d22c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -214,11 +214,7 @@ abstract class AbstractMetrics {
     }
 
     protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
-        if (!map.containsKey(mkey)) {
-            map.put(mkey, Lists.newArrayList(value));
-        } else {
-            map.get(mkey).add(value);
-        }
+        map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value);
     }
 
     protected void populateAggregationMapWithSum(Map<String, Double> map, String mkey, double value) {
@@ -242,24 +238,11 @@ abstract class AbstractMetrics {
      */
     protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
             ManagedLedgerImpl ledger) {
-        if (!ledgersByDimensionMap.containsKey(metrics)) {
-            // create new list
-            ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
-        } else {
-            // add to collection
-            ledgersByDimensionMap.get(metrics).add(ledger);
-        }
+        ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(ledger);
     }
 
     protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
             Metrics metrics, TopicStats destStats) {
-        if (!topicsStatsByDimensionMap.containsKey(metrics)) {
-            // create new list
-            topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
-        } else {
-            // add to collection
-            topicsStatsByDimensionMap.get(metrics).add(destStats);
-        }
-
+        topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(destStats);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index c049366..8bacd0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.compaction.CompactionRecord;
 
 public class AggregatedNamespaceStats {
     public int topicsCount;
@@ -47,6 +49,16 @@ public class AggregatedNamespaceStats {
 
     public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
 
+    long compactionRemovedEventCount;
+    long compactionSucceedCount;
+    long compactionFailedCount;
+    long compactionDurationTimeInMills;
+    double compactionReadThroughput;
+    double compactionWriteThroughput;
+    long compactionCompactedEntriesCount;
+    long compactionCompactedEntriesSize;
+    StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
+
     void updateStats(TopicStats stats) {
         topicsCount++;
 
@@ -112,6 +124,16 @@ public class AggregatedNamespaceStats {
                 consumerStats.unackedMessages += v.unackedMessages;
             });
         });
+
+        compactionRemovedEventCount += stats.compactionRemovedEventCount;
+        compactionSucceedCount += stats.compactionSucceedCount;
+        compactionFailedCount += stats.compactionFailedCount;
+        compactionDurationTimeInMills += stats.compactionDurationTimeInMills;
+        compactionReadThroughput += stats.compactionReadThroughput;
+        compactionWriteThroughput += stats.compactionWriteThroughput;
+        compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount;
+        compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize;
+        compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets);
     }
 
     public void reset() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index e041a01..a2661ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,9 +19,13 @@
 package org.apache.pulsar.broker.stats.prometheus;
 
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -30,7 +34,11 @@ import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 
+@Slf4j
 public class NamespaceStatsAggregator {
 
     private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
@@ -57,6 +65,7 @@ public class NamespaceStatsAggregator {
 
         printDefaultBrokerStats(stream, cluster);
 
+        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
         LongAdder topicsCount = new LongAdder();
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
@@ -66,11 +75,13 @@ public class NamespaceStatsAggregator {
                 topicsMap.forEach((name, topic) -> {
                     getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
                             pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
-                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());
+                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+                            compactorMXBean
+                    );
 
                     if (includeTopicMetrics) {
                         topicsCount.add(1);
-                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
+                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
                     } else {
                         namespaceStats.updateStats(topicStats);
                     }
@@ -87,8 +98,19 @@ public class NamespaceStatsAggregator {
         });
     }
 
+    private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
+        Compactor compactor = null;
+        try {
+            compactor = pulsar.getCompactor(false);
+        } catch (PulsarServerException e) {
+            log.error("get compactor error", e);
+        }
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
+    }
+
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
-            boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
+            boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
+                                      Optional<CompactorMXBean> compactorMXBean) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
@@ -220,6 +242,31 @@ public class NamespaceStatsAggregator {
             aggReplStats.connectedCount += replStats.connected ? 1 : 0;
             aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
         });
+
+        compactorMXBean
+                .flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName()))
+                .map(compactionRecord -> {
+                    stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount();
+                    stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount();
+                    stats.compactionFailedCount = compactionRecord.getCompactionFailedCount();
+                    stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills();
+                    stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput();
+                    stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput();
+                    stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats());
+                    stats.compactionLatencyBuckets.refresh();
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
+                    Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
+                            .getCompactedTopicContext();
+                    if (compactedTopicContext.isPresent()) {
+                        LedgerHandle ledger = compactedTopicContext.get().getLedger();
+                        long entries = ledger.getLastAddConfirmed() + 1;
+                        long size = ledger.getLength();
+
+                        stats.compactionCompactedEntriesCount = entries;
+                        stats.compactionCompactedEntriesSize = size;
+                    }
+                    return compactionRecord;
+                });
     }
 
     private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 9474ecb..61e0175 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.compaction.CompactionRecord;
+import org.apache.pulsar.compaction.CompactorMXBean;
 
 class TopicStats {
 
@@ -51,6 +55,16 @@ class TopicStats {
     // Used for tracking duplicate TYPE definitions
     static Map<String, String> metricWithTypeDefinition = new HashMap<>();
 
+    // For compaction
+    long compactionRemovedEventCount;
+    long compactionSucceedCount;
+    long compactionFailedCount;
+    long compactionDurationTimeInMills;
+    double compactionReadThroughput;
+    double compactionWriteThroughput;
+    long compactionCompactedEntriesCount;
+    long compactionCompactedEntriesSize;
+    StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
 
     public void reset() {
         subscriptionsCount = 0;
@@ -73,6 +87,16 @@ class TopicStats {
         replicationStats.clear();
         subscriptionStats.clear();
         producerStats.clear();
+
+        compactionRemovedEventCount = 0;
+        compactionSucceedCount = 0;
+        compactionFailedCount = 0;
+        compactionDurationTimeInMills = 0;
+        compactionReadThroughput = 0;
+        compactionWriteThroughput = 0;
+        compactionCompactedEntriesCount = 0;
+        compactionCompactedEntriesSize = 0;
+        compactionLatencyBuckets.reset();
     }
 
     static void resetTypes() {
@@ -80,7 +104,7 @@ class TopicStats {
     }
 
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                                TopicStats stats) {
+                                TopicStats stats, Optional<CompactorMXBean> compactorMXBean) {
         metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
         metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
@@ -250,6 +274,53 @@ class TopicStats {
 
         metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
         metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);
+
+        // Compaction
+        boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
+                .map(__ -> true).orElse(false);
+        if (hasCompaction) {
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
+                    stats.compactionRemovedEventCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
+                    stats.compactionSucceedCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
+                    stats.compactionFailedCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
+                    stats.compactionDurationTimeInMills);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
+                    stats.compactionReadThroughput);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
+                    stats.compactionWriteThroughput);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
+                    stats.compactionCompactedEntriesCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
+                    stats.compactionCompactedEntriesSize);
+            long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
+                    compactionLatencyBuckets[0]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
+                    compactionLatencyBuckets[1]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
+                    compactionLatencyBuckets[2]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
+                    compactionLatencyBuckets[3]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
+                    compactionLatencyBuckets[4]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
+                    compactionLatencyBuckets[5]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
+                    compactionLatencyBuckets[6]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
+                    compactionLatencyBuckets[7]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
+                    compactionLatencyBuckets[8]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
+                    compactionLatencyBuckets[9]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
+                    stats.compactionLatencyBuckets.getSum());
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
+                    stats.compactionLatencyBuckets.getCount());
+        }
     }
 
     static void metricType(SimpleTextOutputStream stream, String name) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java
new file mode 100644
index 0000000..76b0642
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.compaction;
+
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import lombok.Getter;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+
+@Getter
+public class CompactedTopicContext {
+
+    final LedgerHandle ledger;
+    final AsyncLoadingCache<Long, MessageIdData> cache;
+
+    public CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long, MessageIdData> cache) {
+        this.ledger = ledger;
+        this.cache = cache;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 21e9a1d..b2f2d87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -30,7 +30,6 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import lombok.Getter;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -274,17 +273,6 @@ public class CompactedTopicImpl implements CompactedTopic {
                 });
     }
 
-    @Getter
-    public static class CompactedTopicContext {
-        final LedgerHandle ledger;
-        final AsyncLoadingCache<Long, MessageIdData> cache;
-
-        CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long, MessageIdData> cache) {
-            this.ledger = ledger;
-            this.cache = cache;
-        }
-    }
-
     /**
      * Getter for CompactedTopicContext.
      * @return CompactedTopicContext
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
new file mode 100644
index 0000000..4a82743
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class CompactionRecord {
+
+    public static final long[] WRITE_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000 };
+
+    @Getter
+    private long lastCompactionRemovedEventCount = 0L;
+    @Getter
+    private long lastCompactionSucceedTimestamp = 0L;
+    @Getter
+    private long lastCompactionFailedTimestamp = 0L;
+    @Getter
+    private long lastCompactionDurationTimeInMills = 0L;
+
+    private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
+    private long lastCompactionStartTimeOp;
+
+    private final LongAdder compactionRemovedEventCount = new LongAdder();
+    private final LongAdder compactionSucceedCount = new LongAdder();
+    private final LongAdder compactionFailedCount = new LongAdder();
+    private final LongAdder compactionDurationTimeInMills = new LongAdder();
+    public final StatsBuckets writeLatencyStats = new StatsBuckets(WRITE_LATENCY_BUCKETS_USEC);
+    public final Rate writeRate = new Rate();
+    public final Rate readRate = new Rate();
+
+    public void reset() {
+        compactionRemovedEventCount.reset();
+        compactionSucceedCount.reset();
+        compactionFailedCount.reset();
+        compactionDurationTimeInMills.reset();
+        writeLatencyStats.reset();
+    }
+
+    public void addCompactionRemovedEvent() {
+        lastCompactionRemovedEventCountOp.increment();
+        compactionRemovedEventCount.increment();
+    }
+
+    public void addCompactionStartOp() {
+        lastCompactionRemovedEventCountOp.reset();
+        lastCompactionStartTimeOp = System.currentTimeMillis();
+    }
+
+    public void addCompactionEndOp(boolean succeed) {
+        lastCompactionDurationTimeInMills = System.currentTimeMillis()
+                - lastCompactionStartTimeOp;
+        compactionDurationTimeInMills.add(lastCompactionDurationTimeInMills);
+        lastCompactionRemovedEventCount = lastCompactionRemovedEventCountOp.longValue();
+        if (succeed) {
+            lastCompactionSucceedTimestamp = System.currentTimeMillis();
+            compactionSucceedCount.increment();
+        } else {
+            lastCompactionFailedTimestamp = System.currentTimeMillis();
+            compactionFailedCount.increment();
+        }
+    }
+
+    public void addCompactionReadOp(long readableBytes) {
+        readRate.recordEvent(readableBytes);
+    }
+
+    public void addCompactionWriteOp(long writeableBytes) {
+        writeRate.recordEvent(writeableBytes);
+    }
+
+    public void addCompactionLatencyOp(long latency, TimeUnit unit) {
+        writeLatencyStats.addValue(unit.toMicros(latency));
+    }
+
+    public long getCompactionRemovedEventCount() {
+        return compactionRemovedEventCount.longValue();
+    }
+
+    public long getCompactionSucceedCount() {
+        return compactionSucceedCount.longValue();
+    }
+
+    public long getCompactionFailedCount() {
+        return compactionFailedCount.longValue();
+    }
+
+    public long getCompactionDurationTimeInMills() {
+        return compactionDurationTimeInMills.longValue();
+    }
+
+    public long[] getCompactionLatencyBuckets() {
+        writeLatencyStats.refresh();
+        return writeLatencyStats.getBuckets();
+    }
+
+    public StatsBuckets getCompactionLatencyStats() {
+        return writeLatencyStats;
+    }
+
+    public double getCompactionReadThroughput() {
+        readRate.calculateRate();
+        return readRate.getValueRate();
+    }
+
+    public double getCompactionWriteThroughput() {
+        writeRate.calculateRate();
+        return writeRate.getValueRate();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
index 54ca2e8..7786e19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 
@@ -29,29 +30,14 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability;
 public interface CompactorMXBean {
 
     /**
-     * @return the removed event count of last compaction
-     */
-    long getLastCompactionRemovedEventCount(String topic);
-
-    /**
-     * @return the timestamp of last succeed compaction
-     */
-    long getLastCompactionSucceedTimestamp(String topic);
-
-    /**
-     * @return the timestamp of last failed compaction
-     */
-    long getLastCompactionFailedTimestamp(String topic);
-
-    /**
-     * @return the duration time of last compaction
-     */
-    long getLastCompactionDurationTimeInMills(String topic);
-
-    /**
      *  Remove metrics about this topic.
      * @param topic
      */
     void removeTopic(String topic);
 
+    /**
+     *  Get the compaction record of the topic.
+     * @param topic
+     */
+    Optional<CompactionRecord> getCompactionRecordForTopic(String topic);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
index 05db2aa..7b63127 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
@@ -18,75 +18,54 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.TimeUnit;
 
 public class CompactorMXBeanImpl implements CompactorMXBean {
 
-    private final ConcurrentHashMap<String, CompactRecord> compactRecordOps = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, CompactionRecord> compactionRecordOps = new ConcurrentHashMap<>();
 
     public void addCompactionRemovedEvent(String topic) {
-        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent();
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionRemovedEvent();
     }
 
     public void addCompactionStartOp(String topic) {
-        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset();
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionStartOp();
     }
 
     public void addCompactionEndOp(String topic, boolean succeed) {
-        CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord());
-        compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis()
-                - compactRecord.lastCompactionStartTimeOp;
-        compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue();
-        if (succeed) {
-            compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis();
-        } else {
-            compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis();
-        }
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionEndOp(succeed);
     }
 
     @Override
-    public long getLastCompactionRemovedEventCount(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount;
+    public void removeTopic(String topic) {
+        compactionRecordOps.remove(topic);
     }
 
     @Override
-    public long getLastCompactionSucceedTimestamp(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp;
+    public Optional<CompactionRecord> getCompactionRecordForTopic(String topic) {
+        return Optional.ofNullable(compactionRecordOps.get(topic));
     }
 
-    @Override
-    public long getLastCompactionFailedTimestamp(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp;
+    public Set<String> getTopics() {
+        return compactionRecordOps.keySet();
     }
 
-    @Override
-    public long getLastCompactionDurationTimeInMills(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills;
+    public void reset() {
+        compactionRecordOps.values().forEach(CompactionRecord::reset);
     }
 
-    @Override
-    public void removeTopic(String topic) {
-        compactRecordOps.remove(topic);
+    public void addCompactionReadOp(String topic, long readableBytes) {
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionReadOp(readableBytes);
     }
 
-    static class CompactRecord {
-
-        private long lastCompactionRemovedEventCount = 0L;
-        private long lastCompactionSucceedTimestamp = 0L;
-        private long lastCompactionFailedTimestamp = 0L;
-        private long lastCompactionDurationTimeInMills = 0L;
-
-        private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
-        private long lastCompactionStartTimeOp;
-
-        public void addCompactionRemovedEvent() {
-            lastCompactionRemovedEventCountOp.increment();
-        }
+    public void addCompactionWriteOp(String topic, long writeableBytes) {
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionWriteOp(writeableBytes);
+    }
 
-        public void reset() {
-            lastCompactionRemovedEventCountOp.reset();
-            lastCompactionStartTimeOp = System.currentTimeMillis();
-        }
+    public void addCompactionLatencyOp(String topic, long latency, TimeUnit unit) {
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionLatencyOp(latency, unit);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 49d121f..ded798c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -125,6 +126,7 @@ public class TwoPhaseCompactor extends Compactor {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
                 boolean replaceMessage = false;
+                mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
@@ -234,6 +236,7 @@ public class TwoPhaseCompactor extends Compactor {
             try {
                 MessageId id = m.getMessageId();
                 Optional<RawMessage> messageToAdd = Optional.empty();
+                mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         messageToAdd = RawBatchConverter.rebatchMessage(
@@ -262,7 +265,7 @@ public class TwoPhaseCompactor extends Compactor {
                     RawMessage message = messageToAdd.get();
                     try {
                         outstanding.acquire();
-                        CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
+                        CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message, reader.getTopic())
                                 .whenComplete((res, exception2) -> {
                                     outstanding.release();
                                     if (exception2 != null) {
@@ -363,12 +366,15 @@ public class TwoPhaseCompactor extends Compactor {
         return bkf;
     }
 
-    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m) {
+    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
         ByteBuf serialized = m.serialize();
         try {
+            mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes());
+            long start = System.nanoTime();
             lh.asyncAddEntry(serialized,
                     (rc, ledger, eid, ctx) -> {
+                        mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
                         if (rc != BKException.Code.OK) {
                             bkf.completeExceptionally(BKException.create(rc));
                         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 7387915..3de3a04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.jsonwebtoken.SignatureAlgorithm;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -43,14 +44,18 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.crypto.SecretKey;
 import javax.naming.AuthenticationException;
 import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -63,10 +68,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -1136,6 +1144,91 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         parseMetrics(sampleMetrics);
     }
 
+    @Test
+    public void testCompaction() throws Exception {
+        final String topicName = "persistent://my-namespace/use/my-ns/my-compaction1";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
+        assertEquals(cm.size(), 0);
+        //
+        final int numMessages = 1000;
+        final int maxKeys = 10;
+        Random r = new Random(0);
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key"+keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.newMessage()
+                    .key(key)
+                    .value(data)
+                    .send();
+        }
+        ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
+        Compactor compactor = pulsar.getCompactor(true);
+        compactor.compact(topicName).get();
+        statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        metricsStr = statsOut.toString();
+        metrics = parseMetrics(metricsStr);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 990);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 1);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
+        assertEquals(cm.size(), 1);
+        assertTrue(cm.get(0).value > 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
+        assertEquals(cm.size(), 1);
+        assertTrue(cm.get(0).value > 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
+        assertEquals(cm.size(), 1);
+        assertTrue(cm.get(0).value > 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 10);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 870);
+
+        pulsarClient.close();
+    }
+
+    private void compareCompactionStateCount(List<Metric> cm, double count) {
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+        assertEquals(cm.get(0).tags.get("broker"), "localhost");
+        assertEquals(cm.get(0).value, count);
+    }
+
     /**
      * Hacky parsing of Prometheus text format. Should be good enough for unit tests
      */
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index 9784928..39ecc42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -43,6 +43,14 @@ public class AggregatedNamespaceStatsTest {
         topicStats1.msgBacklog = 30;
         topicStats1.managedLedgerStats.storageWriteRate = 12.0;
         topicStats1.managedLedgerStats.storageReadRate = 6.0;
+        topicStats1.compactionRemovedEventCount = 10;
+        topicStats1.compactionSucceedCount = 1;
+        topicStats1.compactionFailedCount = 2;
+        topicStats1.compactionDurationTimeInMills = 1000;
+        topicStats1.compactionReadThroughput = 15.0;
+        topicStats1.compactionWriteThroughput = 20.0;
+        topicStats1.compactionCompactedEntriesCount = 30;
+        topicStats1.compactionCompactedEntriesSize = 1000;
 
         AggregatedReplicationStats replStats1 = new AggregatedReplicationStats();
         replStats1.msgRateIn = 1.0;
@@ -75,6 +83,14 @@ public class AggregatedNamespaceStatsTest {
         topicStats2.msgBacklog = 7;
         topicStats2.managedLedgerStats.storageWriteRate = 5.0;
         topicStats2.managedLedgerStats.storageReadRate = 2.5;
+        topicStats2.compactionRemovedEventCount = 10;
+        topicStats2.compactionSucceedCount = 1;
+        topicStats2.compactionFailedCount = 2;
+        topicStats2.compactionDurationTimeInMills = 1000;
+        topicStats2.compactionReadThroughput = 15.0;
+        topicStats2.compactionWriteThroughput = 20.0;
+        topicStats2.compactionCompactedEntriesCount = 30;
+        topicStats2.compactionCompactedEntriesSize = 1000;
 
         AggregatedReplicationStats replStats2 = new AggregatedReplicationStats();
         replStats2.msgRateIn = 3.5;
@@ -113,6 +129,15 @@ public class AggregatedNamespaceStatsTest {
         assertEquals(nsStats.managedLedgerStats.storageSize, 6144);
         assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560);
 
+        assertEquals(nsStats.compactionRemovedEventCount, 20);
+        assertEquals(nsStats.compactionSucceedCount, 2);
+        assertEquals(nsStats.compactionFailedCount, 4);
+        assertEquals(nsStats.compactionDurationTimeInMills, 2000);
+        assertEquals(nsStats.compactionReadThroughput, 30.0);
+        assertEquals(nsStats.compactionWriteThroughput, 40.0);
+        assertEquals(nsStats.compactionCompactedEntriesCount, 60);
+        assertEquals(nsStats.compactionCompactedEntriesSize, 2000);
+
         AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace);
         assertNotNull(nsReplStats);
         assertEquals(nsReplStats.msgRateIn, 4.5);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
index b865396..a3f2cae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
@@ -18,12 +18,9 @@
  */
 package org.apache.pulsar.compaction;
 
-import org.apache.pulsar.broker.service.BrokerService;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -36,15 +33,38 @@ public class CompactorMXBeanImplTest {
         CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl();
         String topic = "topic1";
         mxBean.addCompactionStartOp(topic);
-        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        CompactionRecord compaction = mxBean.getCompactionRecordForTopic(topic).get();
+        assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0);
         mxBean.addCompactionRemovedEvent(topic);
-        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0);
         mxBean.addCompactionEndOp(topic, true);
+        assertEquals(compaction.getLastCompactionRemovedEventCount(), 1, 0);
+        assertTrue(compaction.getLastCompactionSucceedTimestamp() > 0L);
+        assertTrue(compaction.getLastCompactionDurationTimeInMills() >= 0L);
+        assertEquals(compaction.getLastCompactionFailedTimestamp(), 0, 0);
+        assertEquals(compaction.getCompactionRemovedEventCount(), 1, 0);
+        assertEquals(compaction.getCompactionSucceedCount(), 1, 0);
+        assertEquals(compaction.getCompactionFailedCount(), 0, 0);
+        assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L);
+        mxBean.addCompactionStartOp(topic);
+        mxBean.addCompactionRemovedEvent(topic);
         mxBean.addCompactionEndOp(topic, false);
-        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0);
-        assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L);
-        assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L);
-        assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L);
+        assertEquals(compaction.getCompactionRemovedEventCount(), 2, 0);
+        assertEquals(compaction.getCompactionFailedCount(), 1, 0);
+        assertEquals(compaction.getCompactionSucceedCount(), 1, 0);
+        assertTrue(compaction.getLastCompactionFailedTimestamp() > 0L);
+        assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L);
+        mxBean.addCompactionReadOp(topic, 22);
+        assertTrue(compaction.getCompactionReadThroughput() > 0L);
+        mxBean.addCompactionWriteOp(topic, 33);
+        assertTrue(compaction.getCompactionWriteThroughput() > 0L);
+        mxBean.addCompactionLatencyOp(topic, 10, TimeUnit.NANOSECONDS);
+        assertTrue(compaction.getCompactionLatencyBuckets()[0] > 0l);
+        mxBean.reset();
+        assertEquals(compaction.getCompactionRemovedEventCount(), 0, 0);
+        assertEquals(compaction.getCompactionSucceedCount(), 0, 0);
+        assertEquals(compaction.getCompactionFailedCount(), 0, 0);
+        assertEquals(compaction.getCompactionDurationTimeInMills(), 0, 0);
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 8197c4a..3381aee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -113,10 +113,11 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
             m.close();
         }
         if (checkMetrics) {
-            long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic);
-            long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic);
-            long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic);
-            long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic);
+            CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get();
+            long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
+            long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
+            long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
+            long lastCompactDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
             Assert.assertTrue(compactedTopicRemovedEventCount >= 1);
             Assert.assertTrue(lastCompactSucceedTimestamp >= 1L);
             Assert.assertTrue(lastCompactDurationTimeInMills >= 0L);
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 839029b..3124e18 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -204,6 +204,16 @@ All the topic metrics are labelled with the following labels:
 | pulsar_in_messages_total | Counter | The total number of messages received for this topic |
 | pulsar_out_bytes_total | Counter | The total number of bytes read from this topic |
 | pulsar_out_messages_total | Counter | The total number of messages read from this topic |
+| pulsar_compaction_removed_event_count | Gauge | The removed event count of compaction |
+| pulsar_compaction_succeed_count | Gauge | The succeed count of compaction |
+| pulsar_compaction_failed_count | Gauge | The failed count of compaction |
+| pulsar_compaction_duration_time_in_mills | Gauge | The duration time of compaction |
+| pulsar_compaction_read_throughput | Gauge | The read throughput of compaction |
+| pulsar_compaction_write_throughput | Gauge | The write throughput of compaction |
+| pulsar_compaction_latency_le_* | Histogram | The compaction latency with given quantile. <br> Available thresholds: <br><ul><li>pulsar_compaction_latency_le_0_5: <= 0.5ms </li><li>pulsar_compaction_latency_le_1: <= 1ms</li><li>pulsar_compaction_latency_le_5: <= 5ms</li><li>pulsar_compaction_latency_le_10: <= 10ms</li><li>pulsar_compaction_latency_le_20: <= 20ms</li><li>pulsar_compaction_latency_le_50: <= 50ms</li><li>pulsar_compaction_latency_le_100: <= 100ms</li><li>pulsar_compaction_ [...]
+| pulsar_compaction_compacted_entries_count | Gauge | The compacted entries count |
+| pulsar_compaction_compacted_entries_size |Gauge  | The compacted entries size |
+
 
 #### Replication metrics