You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 03:05:27 UTC

[pulsar] 02/05: Add compacted topic metrics for TopicStats in CLI (#11564)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 71cc13be016646da47ba7298c585ae1f484507a5
Author: GuoJiwei <te...@apache.org>
AuthorDate: Sat Aug 21 22:58:20 2021 +0800

    Add compacted topic metrics for TopicStats in CLI (#11564)
    
    Add below metrics to help track potential flows or examine the overall condition of compacted topics .
    - lastCompactionRemovedEventCount : the removed event count of last compaction
    - lastCompactionSucceedTimestamp : the timestamp of last succeed compaction
    - lastCompactionFailedTimestamp : the timestamp of last failed compaction
    - lastCompactionDurationTimeInMills: the duration time of last compaction
    
    These 4 metrics will be displayed in topic stats CLI :
    ```
    ./pulsar-admin topics stats persistent://tenant/ns/topic
    ```
    
    This patch will add metrics in CLI , which would generate doc automatically.
    
    (cherry picked from commit c0ef593990ce8a7ea9ee6f1def880f71def3fc97)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 14 ++--
 .../pulsar/broker/service/BrokerService.java       | 13 ++-
 .../broker/service/persistent/PersistentTopic.java | 22 ++++++
 .../org/apache/pulsar/compaction/Compactor.java    | 29 ++++---
 .../apache/pulsar/compaction/CompactorMXBean.java  | 57 ++++++++++++++
 .../pulsar/compaction/CompactorMXBeanImpl.java     | 92 ++++++++++++++++++++++
 .../pulsar/compaction/TwoPhaseCompactor.java       | 14 +++-
 .../pulsar/compaction/CompactorMXBeanImplTest.java | 50 ++++++++++++
 .../apache/pulsar/compaction/CompactorTest.java    | 20 +++--
 .../common/policies/data/CompactionStats.java      | 37 +++++++++
 .../pulsar/common/policies/data/TopicStats.java    |  3 +
 .../policies/data/stats/CompactionStatsImpl.java   | 47 +++++++++++
 .../common/policies/data/stats/TopicStatsImpl.java |  6 +-
 .../policies/data/PersistentTopicStatsTest.java    |  4 +
 14 files changed, 383 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f7a1337..c09ec45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1313,12 +1313,16 @@ public class PulsarService implements AutoCloseable {
     // only public so mockito can mock it
     public Compactor newCompactor() throws PulsarServerException {
         return new TwoPhaseCompactor(this.getConfiguration(),
-                                     getClient(), getBookKeeperClient(),
-                                     getCompactorExecutor());
+                getClient(), getBookKeeperClient(),
+                getCompactorExecutor());
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        if (this.compactor == null) {
+        return getCompactor(true);
+    }
+
+    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
+        if (this.compactor == null && shouldInitialize) {
             this.compactor = newCompactor();
         }
         return this.compactor;
@@ -1327,8 +1331,8 @@ public class PulsarService implements AutoCloseable {
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
-                .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
-                .name("offloader").build();
+                    .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
+                    .name("offloader").build();
         }
         return this.offloaderScheduler;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ce9abcb..74fde84 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -164,6 +164,10 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.common.util.netty.NettyFutureUtil;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
@@ -1770,8 +1774,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 }
             }
         }
-
         topics.remove(topic);
+
+        try {
+            Compactor compactor = pulsar.getCompactor(false);
+            if (compactor != null) {
+                compactor.getStats().removeTopic(topic);
+            }
+        } catch (PulsarServerException ignore) {
+        }
     }
 
     public int getNumberOfNamespaceBundles() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2808021..b017750 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -150,6 +150,8 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
@@ -1899,9 +1901,29 @@ public class PersistentTopic extends AbstractTopic
         stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
+        Optional<CompactorMXBean> mxBean = getCompactorMXBean();
+        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
+                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
+        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
+                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
+        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
+                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
+        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
+                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
+
         return stats;
     }
 
+    private Optional<CompactorMXBean> getCompactorMXBean() {
+        Compactor compactor = null;
+        try {
+            compactor = brokerService.pulsar().getCompactor(false);
+        } catch (PulsarServerException ex) {
+            log.warn("get compactor error", ex);
+        }
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
+    }
+
     @Override
     public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 61032d6..cb631e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -42,6 +42,7 @@ public abstract class Compactor {
     protected final ScheduledExecutorService scheduler;
     private final PulsarClient pulsar;
     private final BookKeeper bk;
+    protected final CompactorMXBeanImpl mxBean;
 
     public Compactor(ServiceConfiguration conf,
                      PulsarClient pulsar,
@@ -51,6 +52,7 @@ public abstract class Compactor {
         this.scheduler = scheduler;
         this.pulsar = pulsar;
         this.bk = bk;
+        this.mxBean = new CompactorMXBeanImpl();
     }
 
     public CompletableFuture<Long> compact(String topic) {
@@ -60,23 +62,30 @@ public abstract class Compactor {
 
     private CompletableFuture<Long> compactAndCloseReader(RawReader reader) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
+        mxBean.addCompactionStartOp(reader.getTopic());
         doCompaction(reader, bk).whenComplete(
                 (ledgerId, exception) -> {
                     reader.closeAsync().whenComplete((v, exception2) -> {
-                            if (exception2 != null) {
-                                log.warn("Error closing reader handle {}, ignoring", reader, exception2);
-                            }
-                            if (exception != null) {
-                                // complete with original exception
-                                promise.completeExceptionally(exception);
-                            } else {
-                                promise.complete(ledgerId);
-                            }
-                        });
+                        if (exception2 != null) {
+                            log.warn("Error closing reader handle {}, ignoring", reader, exception2);
+                        }
+                        if (exception != null) {
+                            // complete with original exception
+                            mxBean.addCompactionEndOp(reader.getTopic(), false);
+                            promise.completeExceptionally(exception);
+                        } else {
+                            mxBean.addCompactionEndOp(reader.getTopic(), true);
+                            promise.complete(ledgerId);
+                        }
+                    });
                 });
         return promise;
     }
 
     protected abstract CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk);
+
+    public CompactorMXBean getStats() {
+        return this.mxBean;
+    }
 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
new file mode 100644
index 0000000..54ca2e8
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+
+/**
+ * JMX Bean interface for Compactor stats.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public interface CompactorMXBean {
+
+    /**
+     * @return the removed event count of last compaction
+     */
+    long getLastCompactionRemovedEventCount(String topic);
+
+    /**
+     * @return the timestamp of last succeed compaction
+     */
+    long getLastCompactionSucceedTimestamp(String topic);
+
+    /**
+     * @return the timestamp of last failed compaction
+     */
+    long getLastCompactionFailedTimestamp(String topic);
+
+    /**
+     * @return the duration time of last compaction
+     */
+    long getLastCompactionDurationTimeInMills(String topic);
+
+    /**
+     *  Remove metrics about this topic.
+     * @param topic
+     */
+    void removeTopic(String topic);
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
new file mode 100644
index 0000000..05db2aa
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+public class CompactorMXBeanImpl implements CompactorMXBean {
+
+    private final ConcurrentHashMap<String, CompactRecord> compactRecordOps = new ConcurrentHashMap<>();
+
+    public void addCompactionRemovedEvent(String topic) {
+        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent();
+    }
+
+    public void addCompactionStartOp(String topic) {
+        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset();
+    }
+
+    public void addCompactionEndOp(String topic, boolean succeed) {
+        CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord());
+        compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis()
+                - compactRecord.lastCompactionStartTimeOp;
+        compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue();
+        if (succeed) {
+            compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis();
+        } else {
+            compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis();
+        }
+    }
+
+    @Override
+    public long getLastCompactionRemovedEventCount(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount;
+    }
+
+    @Override
+    public long getLastCompactionSucceedTimestamp(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp;
+    }
+
+    @Override
+    public long getLastCompactionFailedTimestamp(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp;
+    }
+
+    @Override
+    public long getLastCompactionDurationTimeInMills(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills;
+    }
+
+    @Override
+    public void removeTopic(String topic) {
+        compactRecordOps.remove(topic);
+    }
+
+    static class CompactRecord {
+
+        private long lastCompactionRemovedEventCount = 0L;
+        private long lastCompactionSucceedTimestamp = 0L;
+        private long lastCompactionFailedTimestamp = 0L;
+        private long lastCompactionDurationTimeInMills = 0L;
+
+        private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
+        private long lastCompactionStartTimeOp;
+
+        public void addCompactionRemovedEvent() {
+            lastCompactionRemovedEventCountOp.increment();
+        }
+
+        public void reset() {
+            lastCompactionRemovedEventCountOp.reset();
+            lastCompactionStartTimeOp = System.currentTimeMillis();
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 0f0f981..49d121f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -124,18 +124,23 @@ public class TwoPhaseCompactor extends Compactor {
             try {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
+                boolean replaceMessage = false;
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
                                 .extractIdsAndKeysAndSize(m)) {
                             if (e != null) {
                                 if (e.getRight() > 0) {
-                                    latestForKey.put(e.getMiddle(), e.getLeft());
+                                    MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
+                                    replaceMessage = old != null;
                                 } else {
                                     deletedMessage = true;
                                     latestForKey.remove(e.getMiddle());
                                 }
                             }
+                            if (replaceMessage || deletedMessage) {
+                                mxBean.addCompactionRemovedEvent(reader.getTopic());
+                            }
                         }
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole batch will be included in output",
@@ -145,14 +150,17 @@ public class TwoPhaseCompactor extends Compactor {
                     Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
                     if (keyAndSize != null) {
                         if (keyAndSize.getRight() > 0) {
-                            latestForKey.put(keyAndSize.getLeft(), id);
+                            MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
+                            replaceMessage = old != null;
                         } else {
                             deletedMessage = true;
                             latestForKey.remove(keyAndSize.getLeft());
                         }
                     }
+                    if (replaceMessage || deletedMessage) {
+                        mxBean.addCompactionRemovedEvent(reader.getTopic());
+                    }
                 }
-
                 MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
                 MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
                 if (id.compareTo(lastMessageId) == 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
new file mode 100644
index 0000000..b865396
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.broker.service.BrokerService;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Test(groups = "broker-compaction")
+public class CompactorMXBeanImplTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl();
+        String topic = "topic1";
+        mxBean.addCompactionStartOp(topic);
+        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        mxBean.addCompactionRemovedEvent(topic);
+        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        mxBean.addCompactionEndOp(topic, true);
+        mxBean.addCompactionEndOp(topic, false);
+        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0);
+        assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L);
+        assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L);
+        assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 0d1a95c..8197c4a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -84,7 +84,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         compactionScheduler.shutdownNow();
     }
 
-    private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
+    private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
                 this.conf, null, null, Optional.empty(), null);
         Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
@@ -112,6 +112,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
                                 "Compacted version should match expected version");
             m.close();
         }
+        if (checkMetrics) {
+            long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic);
+            long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic);
+            long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic);
+            long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic);
+            Assert.assertTrue(compactedTopicRemovedEventCount >= 1);
+            Assert.assertTrue(lastCompactSucceedTimestamp >= 1L);
+            Assert.assertTrue(lastCompactDurationTimeInMills >= 0L);
+            Assert.assertEquals(lastCompactFailedTimestamp, 0L);
+        }
         Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found");
         return keys;
     }
@@ -140,7 +150,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
                     .send();
             expected.put(key, data);
         }
-        compactAndVerify(topic, expected);
+        compactAndVerify(topic, expected, true);
     }
 
     @Test
@@ -169,7 +179,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         expected.put("a", "A_2".getBytes());
         expected.put("b", "B_1".getBytes());
 
-        compactAndVerify(topic, new HashMap<>(expected));
+        compactAndVerify(topic, new HashMap<>(expected), false);
 
         producer.newMessage()
                 .key("b")
@@ -177,7 +187,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
                 .send();
         expected.put("b", "B_2".getBytes());
 
-        compactAndVerify(topic, expected);
+        compactAndVerify(topic, expected, false);
     }
 
     @Test
@@ -206,7 +216,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         expected.put("b", "B_1".getBytes());
         expected.put("c", "C_1".getBytes());
 
-        List<String> keyOrder = compactAndVerify(topic, expected);
+        List<String> keyOrder = compactAndVerify(topic, expected, false);
 
         Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
     }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java
new file mode 100644
index 0000000..0500017
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Statistics about compaction.
+ */
+public interface CompactionStats {
+
+    /** The removed event count of last compaction. */
+    long getLastCompactionRemovedEventCount();
+
+    /** The timestamp of last succeed compaction. */
+    long getLastCompactionSucceedTimestamp();
+
+    /** The timestamp of last failed compaction. */
+    long getLastCompactionFailedTimestamp();
+
+    /** The duration time of last compaction. */
+    long getLastCompactionDurationTimeInMills();
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index afc1810..dc1964e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -85,4 +85,7 @@ public interface TopicStats {
 
     /** The serialized size of non-contiguous deleted messages ranges. */
     int getNonContiguousDeletedMessagesRangesSerializedSize();
+
+    /** The compaction stats. */
+    CompactionStats getCompaction();
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java
new file mode 100644
index 0000000..e187f6a
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.CompactionStats;
+/**
+ * Statistics about compaction.
+ */
+@Data
+public class CompactionStatsImpl implements CompactionStats {
+
+    /** The removed event count of last compaction. */
+    public long lastCompactionRemovedEventCount;
+
+    /** The timestamp of last succeed compaction. */
+    public long lastCompactionSucceedTimestamp;
+
+    /** The timestamp of last failed compaction. */
+    public long lastCompactionFailedTimestamp;
+
+    /** The duration time of last compaction. */
+    public long lastCompactionDurationTimeInMills;
+
+    public void reset() {
+        this.lastCompactionRemovedEventCount = 0;
+        this.lastCompactionSucceedTimestamp = 0;
+        this.lastCompactionFailedTimestamp = 0;
+        this.lastCompactionDurationTimeInMills = 0;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 7ad8e83..f3b3944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.TreeMap;
 
 /**
@@ -114,6 +113,9 @@ public class TopicStatsImpl implements TopicStats {
     /** The serialized size of non-contiguous deleted messages ranges. */
     public int nonContiguousDeletedMessagesRangesSerializedSize;
 
+    /** The compaction stats */
+    public CompactionStatsImpl compaction;
+
     public List<? extends PublisherStats> getPublishers() {
         return publishers;
     }
@@ -130,6 +132,7 @@ public class TopicStatsImpl implements TopicStats {
         this.publishers = new ArrayList<>();
         this.subscriptions = new HashMap<>();
         this.replication = new TreeMap<>();
+        this.compaction = new CompactionStatsImpl();
     }
 
     public void reset() {
@@ -157,6 +160,7 @@ public class TopicStatsImpl implements TopicStats {
         this.lastOffloadLedgerId = 0;
         this.lastOffloadFailureTimeStamp = 0;
         this.lastOffloadSuccessTimeStamp = 0;
+        this.compaction.reset();
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index fa67fb0..9c4c3b2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -60,6 +60,10 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.publishers.size(), 1);
         assertEquals(topicStats.subscriptions.size(), 1);
         assertEquals(topicStats.replication.size(), 1);
+        assertEquals(topicStats.compaction.lastCompactionRemovedEventCount, 0);
+        assertEquals(topicStats.compaction.lastCompactionSucceedTimestamp, 0);
+        assertEquals(topicStats.compaction.lastCompactionFailedTimestamp, 0);
+        assertEquals(topicStats.compaction.lastCompactionDurationTimeInMills, 0);
         topicStats.reset();
         assertEquals(topicStats.msgRateIn, 0.0);
         assertEquals(topicStats.msgThroughputIn, 0.0);