You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/02 03:05:25 UTC

[pulsar] branch branch-2.8 updated (6bc1e0d -> 19b2b69)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 6bc1e0d  Source tarball: apply executable file permissions to shell scripts (fixes #10917)  (#11858)
     new 875bfdf  [python-client] Fixed crash when using Python logger (#10981)
     new 71cc13b  Add compacted topic metrics for TopicStats in CLI (#11564)
     new a80f1e8  Expose compaction metrics to Prometheus (#11739)
     new bbed232  [Broker] Refine topic level backlog quota policies warning log (#11863)
     new 19b2b69  Fix the checkstyle issue during the cherry-pick

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    |  14 ++-
 .../pulsar/broker/service/BacklogQuotaManager.java |   7 +-
 .../pulsar/broker/service/BrokerService.java       |  10 +-
 .../broker/service/persistent/PersistentTopic.java |  52 +++++++--
 .../broker/stats/metrics/AbstractMetrics.java      |  23 +---
 .../stats/prometheus/AggregatedNamespaceStats.java |  22 ++++
 .../stats/prometheus/NamespaceStatsAggregator.java |  53 ++++++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  73 +++++++++++-
 .../pulsar/compaction/CompactedTopicContext.java   |  19 +--
 .../pulsar/compaction/CompactedTopicImpl.java      |  12 --
 .../apache/pulsar/compaction/CompactionRecord.java | 130 +++++++++++++++++++++
 .../org/apache/pulsar/compaction/Compactor.java    |  29 +++--
 .../apache/pulsar/compaction/CompactorMXBean.java  |  24 ++--
 .../pulsar/compaction/CompactorMXBeanImpl.java     |  71 +++++++++++
 .../pulsar/compaction/TwoPhaseCompactor.java       |  24 +++-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  93 +++++++++++++++
 .../prometheus/AggregatedNamespaceStatsTest.java   |  25 ++++
 .../pulsar/compaction/CompactorMXBeanImplTest.java |  70 +++++++++++
 .../apache/pulsar/compaction/CompactorTest.java    |  21 +++-
 .../data/{AuthAction.java => CompactionStats.java} |  25 ++--
 .../pulsar/common/policies/data/TopicStats.java    |   3 +
 pulsar-client-cpp/python/src/config.cc             |  65 +++++------
 ...atorStatsImpl.java => CompactionStatsImpl.java} |  39 ++++---
 .../common/policies/data/stats/TopicStatsImpl.java |   6 +-
 .../policies/data/PersistentTopicStatsTest.java    |   4 +
 site2/docs/reference-metrics.md                    |  10 ++
 26 files changed, 761 insertions(+), 163 deletions(-)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/intercept/InterceptException.java => pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java (63%)
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
 copy managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Position.java => pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java (71%)
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
 copy pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/{AuthAction.java => CompactionStats.java} (65%)
 copy pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/{NonPersistentReplicatorStatsImpl.java => CompactionStatsImpl.java} (52%)

[pulsar] 04/05: [Broker] Refine topic level backlog quota policies warning log (#11863)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bbed232f10bfdf44bf288c27610a71d99a922aa9
Author: ran <ga...@126.com>
AuthorDate: Thu Sep 2 09:44:17 2021 +0800

    [Broker] Refine topic level backlog quota policies warning log (#11863)
    
    ### Motivation
    
    Currently, if the `TopicPolicies` isn't initialized completely, the topic gets backlog quota will print the error log stack, this is not necessary, because after the TopicPolicies initialized, the topic level backlog quota will come into effect.
    
    ```
    15:24:01.962 [bookkeeper-ml-workers-OrderedExecutor-2-0] WARN  org.apache.pulsar.broker.service.BacklogQuotaManager - Failed to read topic policies data, will apply the namespace backlog quota: topicName=persistent://public/ns3/t1
    org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
    	at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at org.apache.pulsar.broker.service.BacklogQuotaManager.getBacklogQuota(BacklogQuotaManager.java:85) [org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.getBacklogQuota(PersistentTopic.java:2172) [org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.isBacklogQuotaExceeded(PersistentTopic.java:2181) [org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleProducer$20(ServerCnx.java:1100) [org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) [?:?]
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
    	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) [?:?]
    	at org.apache.pulsar.broker.service.BrokerService$3.lambda$openLedgerComplete$2(BrokerService.java:1125) [org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815) [?:?]
    	at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799) [?:?]
    	at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121) [?:?]
    	at org.apache.pulsar.broker.service.BrokerService$3.openLedgerComplete(BrokerService.java:1111) [org.apache.pulsar-pulsar-broker-2.7.3.jar:2.7.3]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda$asyncOpen$8(ManagedLedgerFactoryImpl.java:425) [org.apache.pulsar-managed-ledger-2.7.3.jar:2.7.3]
    	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) [?:?]
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [?:?]
    	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) [?:?]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl$2.initializeComplete(ManagedLedgerFactoryImpl.java:397) [org.apache.pulsar-managed-ledger-2.7.3.jar:2.7.3]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3.operationComplete(ManagedLedgerImpl.java:476) [org.apache.pulsar-managed-ledger-2.7.3.jar:2.7.3]
    	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$3.operationComplete(ManagedLedgerImpl.java:466) [org.apache.pulsar-managed-ledger-2.7.3.jar:2.7.3]
    	at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda$getCursors$8(MetaStoreImpl.java:122) [org.apache.pulsar-managed-ledger-2.7.3.jar:2.7.3]
    	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) [?:?]
    	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [?:?]
    	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
    	at java.lang.Thread.run(Thread.java:834) [?:?]
    ```
    
    ### Modifications
    
    Remove the error log stack.
    
    (cherry picked from commit d370d0011cdd7256633d4bf10d73f84bdee1e28b)
---
 .../java/org/apache/pulsar/broker/service/BacklogQuotaManager.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index bbb3ddf..c9cd15e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -90,9 +90,12 @@ public class BacklogQuotaManager {
                     .map(TopicPolicies::getBackLogQuotaMap)
                     .map(map -> map.get(backlogQuotaType.name()))
                     .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType));
+        } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
+            log.debug("Topic policies cache have not init, will apply the namespace backlog quota: topicName={}",
+                    topicName);
         } catch (Exception e) {
-            log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}",
-                    topicName, e);
+            log.error("Failed to read topic policies data, "
+                            + "will apply the namespace backlog quota: topicName={}", topicName, e);
         }
         return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
     }

[pulsar] 03/05: Expose compaction metrics to Prometheus (#11739)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a80f1e8774d1c8f6505f19ed8ec1b97d75fa47d9
Author: GuoJiwei <te...@apache.org>
AuthorDate: Tue Aug 31 22:58:44 2021 +0800

    Expose compaction metrics to Prometheus (#11739)
    
    ### Motivation
    As #11564 has involved compaction metrics in CLI, it's extremely useful to expose relative metrics to Prometheus.
    
    - pulsar_compaction_removed_event_count :  the removed event count of compaction .
    - pulsar_compaction_succeed_count : the succeed count of compaction.
    - pulsar_compaction_failed_count : the failed count of compaction.
    - pulsar_compaction_duration_time_in_millis : the duration time of compaction.
    - pulsar_compaction_read_throughput : the read throughput of compaction.
    - pulsar_compaction_write_throughput : the write throughput of compaction.
    - pulsar_compaction_compacted_entries_count: the compacted entries count.
    - pulsar_compaction_compacted_entries_size: the compacted entries size;
    
    if users enable the topic level metrics and the topic has been compacted or doing the compaction, we should expose the compaction metrics for the topic level. If users disable the topic level metrics, we should expose the aggregated compaction metrics for the namespace level.
    
    
    (cherry picked from commit 656635ef7638c9df6be9513bd38d34f6872b2b29)
---
 .../broker/service/persistent/PersistentTopic.java |  46 ++++----
 .../broker/stats/metrics/AbstractMetrics.java      |  23 +---
 .../stats/prometheus/AggregatedNamespaceStats.java |  22 ++++
 .../stats/prometheus/NamespaceStatsAggregator.java |  53 ++++++++-
 .../pulsar/broker/stats/prometheus/TopicStats.java |  73 +++++++++++-
 .../pulsar/compaction/CompactedTopicContext.java   |  37 ++++++
 .../pulsar/compaction/CompactedTopicImpl.java      |  12 --
 .../apache/pulsar/compaction/CompactionRecord.java | 130 +++++++++++++++++++++
 .../apache/pulsar/compaction/CompactorMXBean.java  |  26 +----
 .../pulsar/compaction/CompactorMXBeanImpl.java     |  65 ++++-------
 .../pulsar/compaction/TwoPhaseCompactor.java       |  10 +-
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  93 +++++++++++++++
 .../prometheus/AggregatedNamespaceStatsTest.java   |  25 ++++
 .../pulsar/compaction/CompactorMXBeanImplTest.java |  40 +++++--
 .../apache/pulsar/compaction/CompactorTest.java    |   9 +-
 site2/docs/reference-metrics.md                    |  10 ++
 16 files changed, 539 insertions(+), 135 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index b017750..a4db0bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -149,6 +149,7 @@ import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
+import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.CompactorMXBean;
@@ -1902,15 +1903,16 @@ public class PersistentTopic extends AbstractTopic
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
         Optional<CompactorMXBean> mxBean = getCompactorMXBean();
-        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
-                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
-        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
-                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
 
+        stats.compaction.reset();
+        mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
+            stats.compaction.lastCompactionRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
+            stats.compaction.lastCompactionSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
+            stats.compaction.lastCompactionFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
+            stats.compaction.lastCompactionDurationTimeInMills =
+                    compactionRecord.getLastCompactionDurationTimeInMills();
+            return compactionRecord;
+        });
         return stats;
     }
 
@@ -1987,19 +1989,14 @@ public class PersistentTopic extends AbstractTopic
         info.entries = -1;
         info.size = -1;
 
-        try {
-            Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext =
-                    ((CompactedTopicImpl) compactedTopic)
-                            .getCompactedTopicContext();
-            if (compactedTopicContext.isPresent()) {
-                CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
-                info.ledgerId = ledgerContext.getLedger().getId();
-                info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
-                info.size = ledgerContext.getLedger().getLength();
-            }
-        } catch (ExecutionException | InterruptedException e) {
-            log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
+        Optional<CompactedTopicContext> compactedTopicContext = getCompactedTopicContext();
+        if (compactedTopicContext.isPresent()) {
+            CompactedTopicContext ledgerContext = compactedTopicContext.get();
+            info.ledgerId = ledgerContext.getLedger().getId();
+            info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
+            info.size = ledgerContext.getLedger().getLength();
         }
+
         stats.compactedLedger = info;
 
         stats.cursors = Maps.newTreeMap();
@@ -2116,6 +2113,15 @@ public class PersistentTopic extends AbstractTopic
         return statFuture;
     }
 
+    public Optional<CompactedTopicContext> getCompactedTopicContext() {
+        try {
+            return ((CompactedTopicImpl) compactedTopic).getCompactedTopicContext();
+        } catch (ExecutionException | InterruptedException e) {
+            log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
+        }
+        return Optional.empty();
+    }
+
     public long getBacklogSize() {
         return ledger.getEstimatedBacklogSize();
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
index ed6e79f..610d22c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
@@ -214,11 +214,7 @@ abstract class AbstractMetrics {
     }
 
     protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
-        if (!map.containsKey(mkey)) {
-            map.put(mkey, Lists.newArrayList(value));
-        } else {
-            map.get(mkey).add(value);
-        }
+        map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value);
     }
 
     protected void populateAggregationMapWithSum(Map<String, Double> map, String mkey, double value) {
@@ -242,24 +238,11 @@ abstract class AbstractMetrics {
      */
     protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
             ManagedLedgerImpl ledger) {
-        if (!ledgersByDimensionMap.containsKey(metrics)) {
-            // create new list
-            ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
-        } else {
-            // add to collection
-            ledgersByDimensionMap.get(metrics).add(ledger);
-        }
+        ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(ledger);
     }
 
     protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
             Metrics metrics, TopicStats destStats) {
-        if (!topicsStatsByDimensionMap.containsKey(metrics)) {
-            // create new list
-            topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
-        } else {
-            // add to collection
-            topicsStatsByDimensionMap.get(metrics).add(destStats);
-        }
-
+        topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList()).add(destStats);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
index c049366..8bacd0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStats.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.compaction.CompactionRecord;
 
 public class AggregatedNamespaceStats {
     public int topicsCount;
@@ -47,6 +49,16 @@ public class AggregatedNamespaceStats {
 
     public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
 
+    long compactionRemovedEventCount;
+    long compactionSucceedCount;
+    long compactionFailedCount;
+    long compactionDurationTimeInMills;
+    double compactionReadThroughput;
+    double compactionWriteThroughput;
+    long compactionCompactedEntriesCount;
+    long compactionCompactedEntriesSize;
+    StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
+
     void updateStats(TopicStats stats) {
         topicsCount++;
 
@@ -112,6 +124,16 @@ public class AggregatedNamespaceStats {
                 consumerStats.unackedMessages += v.unackedMessages;
             });
         });
+
+        compactionRemovedEventCount += stats.compactionRemovedEventCount;
+        compactionSucceedCount += stats.compactionSucceedCount;
+        compactionFailedCount += stats.compactionFailedCount;
+        compactionDurationTimeInMills += stats.compactionDurationTimeInMills;
+        compactionReadThroughput += stats.compactionReadThroughput;
+        compactionWriteThroughput += stats.compactionWriteThroughput;
+        compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount;
+        compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize;
+        compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets);
     }
 
     public void reset() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index e041a01..a2661ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -19,9 +19,13 @@
 package org.apache.pulsar.broker.stats.prometheus;
 
 import io.netty.util.concurrent.FastThreadLocal;
+import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -30,7 +34,11 @@ import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 
+@Slf4j
 public class NamespaceStatsAggregator {
 
     private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
@@ -57,6 +65,7 @@ public class NamespaceStatsAggregator {
 
         printDefaultBrokerStats(stream, cluster);
 
+        Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
         LongAdder topicsCount = new LongAdder();
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
@@ -66,11 +75,13 @@ public class NamespaceStatsAggregator {
                 topicsMap.forEach((name, topic) -> {
                     getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
                             pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
-                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus());
+                            pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
+                            compactorMXBean
+                    );
 
                     if (includeTopicMetrics) {
                         topicsCount.add(1);
-                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats);
+                        TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean);
                     } else {
                         namespaceStats.updateStats(topicStats);
                     }
@@ -87,8 +98,19 @@ public class NamespaceStatsAggregator {
         });
     }
 
+    private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
+        Compactor compactor = null;
+        try {
+            compactor = pulsar.getCompactor(false);
+        } catch (PulsarServerException e) {
+            log.error("get compactor error", e);
+        }
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
+    }
+
     private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
-            boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
+            boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
+                                      Optional<CompactorMXBean> compactorMXBean) {
         stats.reset();
 
         if (topic instanceof PersistentTopic) {
@@ -220,6 +242,31 @@ public class NamespaceStatsAggregator {
             aggReplStats.connectedCount += replStats.connected ? 1 : 0;
             aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
         });
+
+        compactorMXBean
+                .flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName()))
+                .map(compactionRecord -> {
+                    stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount();
+                    stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount();
+                    stats.compactionFailedCount = compactionRecord.getCompactionFailedCount();
+                    stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills();
+                    stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput();
+                    stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput();
+                    stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats());
+                    stats.compactionLatencyBuckets.refresh();
+                    PersistentTopic persistentTopic = (PersistentTopic) topic;
+                    Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
+                            .getCompactedTopicContext();
+                    if (compactedTopicContext.isPresent()) {
+                        LedgerHandle ledger = compactedTopicContext.get().getLedger();
+                        long entries = ledger.getLastAddConfirmed() + 1;
+                        long size = ledger.getLength();
+
+                        stats.compactionCompactedEntriesCount = entries;
+                        stats.compactionCompactedEntriesSize = size;
+                    }
+                    return compactionRecord;
+                });
     }
 
     private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 9474ecb..61e0175 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -20,7 +20,11 @@ package org.apache.pulsar.broker.stats.prometheus;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.apache.pulsar.compaction.CompactionRecord;
+import org.apache.pulsar.compaction.CompactorMXBean;
 
 class TopicStats {
 
@@ -51,6 +55,16 @@ class TopicStats {
     // Used for tracking duplicate TYPE definitions
     static Map<String, String> metricWithTypeDefinition = new HashMap<>();
 
+    // For compaction
+    long compactionRemovedEventCount;
+    long compactionSucceedCount;
+    long compactionFailedCount;
+    long compactionDurationTimeInMills;
+    double compactionReadThroughput;
+    double compactionWriteThroughput;
+    long compactionCompactedEntriesCount;
+    long compactionCompactedEntriesSize;
+    StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
 
     public void reset() {
         subscriptionsCount = 0;
@@ -73,6 +87,16 @@ class TopicStats {
         replicationStats.clear();
         subscriptionStats.clear();
         producerStats.clear();
+
+        compactionRemovedEventCount = 0;
+        compactionSucceedCount = 0;
+        compactionFailedCount = 0;
+        compactionDurationTimeInMills = 0;
+        compactionReadThroughput = 0;
+        compactionWriteThroughput = 0;
+        compactionCompactedEntriesCount = 0;
+        compactionCompactedEntriesSize = 0;
+        compactionLatencyBuckets.reset();
     }
 
     static void resetTypes() {
@@ -80,7 +104,7 @@ class TopicStats {
     }
 
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
-                                TopicStats stats) {
+                                TopicStats stats, Optional<CompactorMXBean> compactorMXBean) {
         metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
         metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
@@ -250,6 +274,53 @@ class TopicStats {
 
         metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter);
         metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter);
+
+        // Compaction
+        boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
+                .map(__ -> true).orElse(false);
+        if (hasCompaction) {
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
+                    stats.compactionRemovedEventCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
+                    stats.compactionSucceedCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
+                    stats.compactionFailedCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
+                    stats.compactionDurationTimeInMills);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
+                    stats.compactionReadThroughput);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
+                    stats.compactionWriteThroughput);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
+                    stats.compactionCompactedEntriesCount);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
+                    stats.compactionCompactedEntriesSize);
+            long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
+                    compactionLatencyBuckets[0]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
+                    compactionLatencyBuckets[1]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
+                    compactionLatencyBuckets[2]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
+                    compactionLatencyBuckets[3]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
+                    compactionLatencyBuckets[4]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
+                    compactionLatencyBuckets[5]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
+                    compactionLatencyBuckets[6]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
+                    compactionLatencyBuckets[7]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
+                    compactionLatencyBuckets[8]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
+                    compactionLatencyBuckets[9]);
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
+                    stats.compactionLatencyBuckets.getSum());
+            metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
+                    stats.compactionLatencyBuckets.getCount());
+        }
     }
 
     static void metricType(SimpleTextOutputStream stream, String name) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java
new file mode 100644
index 0000000..76b0642
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicContext.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.compaction;
+
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import lombok.Getter;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.pulsar.common.api.proto.MessageIdData;
+
+@Getter
+public class CompactedTopicContext {
+
+    final LedgerHandle ledger;
+    final AsyncLoadingCache<Long, MessageIdData> cache;
+
+    public CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long, MessageIdData> cache) {
+        this.ledger = ledger;
+        this.cache = cache;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index 21e9a1d..b2f2d87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -30,7 +30,6 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import lombok.Getter;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -274,17 +273,6 @@ public class CompactedTopicImpl implements CompactedTopic {
                 });
     }
 
-    @Getter
-    public static class CompactedTopicContext {
-        final LedgerHandle ledger;
-        final AsyncLoadingCache<Long, MessageIdData> cache;
-
-        CompactedTopicContext(LedgerHandle ledger, AsyncLoadingCache<Long, MessageIdData> cache) {
-            this.ledger = ledger;
-            this.cache = cache;
-        }
-    }
-
     /**
      * Getter for CompactedTopicContext.
      * @return CompactedTopicContext
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
new file mode 100644
index 0000000..4a82743
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactionRecord.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.pulsar.common.stats.Rate;
+
+public class CompactionRecord {
+
+    public static final long[] WRITE_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
+            200_000, 1000_000 };
+
+    @Getter
+    private long lastCompactionRemovedEventCount = 0L;
+    @Getter
+    private long lastCompactionSucceedTimestamp = 0L;
+    @Getter
+    private long lastCompactionFailedTimestamp = 0L;
+    @Getter
+    private long lastCompactionDurationTimeInMills = 0L;
+
+    private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
+    private long lastCompactionStartTimeOp;
+
+    private final LongAdder compactionRemovedEventCount = new LongAdder();
+    private final LongAdder compactionSucceedCount = new LongAdder();
+    private final LongAdder compactionFailedCount = new LongAdder();
+    private final LongAdder compactionDurationTimeInMills = new LongAdder();
+    public final StatsBuckets writeLatencyStats = new StatsBuckets(WRITE_LATENCY_BUCKETS_USEC);
+    public final Rate writeRate = new Rate();
+    public final Rate readRate = new Rate();
+
+    public void reset() {
+        compactionRemovedEventCount.reset();
+        compactionSucceedCount.reset();
+        compactionFailedCount.reset();
+        compactionDurationTimeInMills.reset();
+        writeLatencyStats.reset();
+    }
+
+    public void addCompactionRemovedEvent() {
+        lastCompactionRemovedEventCountOp.increment();
+        compactionRemovedEventCount.increment();
+    }
+
+    public void addCompactionStartOp() {
+        lastCompactionRemovedEventCountOp.reset();
+        lastCompactionStartTimeOp = System.currentTimeMillis();
+    }
+
+    public void addCompactionEndOp(boolean succeed) {
+        lastCompactionDurationTimeInMills = System.currentTimeMillis()
+                - lastCompactionStartTimeOp;
+        compactionDurationTimeInMills.add(lastCompactionDurationTimeInMills);
+        lastCompactionRemovedEventCount = lastCompactionRemovedEventCountOp.longValue();
+        if (succeed) {
+            lastCompactionSucceedTimestamp = System.currentTimeMillis();
+            compactionSucceedCount.increment();
+        } else {
+            lastCompactionFailedTimestamp = System.currentTimeMillis();
+            compactionFailedCount.increment();
+        }
+    }
+
+    public void addCompactionReadOp(long readableBytes) {
+        readRate.recordEvent(readableBytes);
+    }
+
+    public void addCompactionWriteOp(long writeableBytes) {
+        writeRate.recordEvent(writeableBytes);
+    }
+
+    public void addCompactionLatencyOp(long latency, TimeUnit unit) {
+        writeLatencyStats.addValue(unit.toMicros(latency));
+    }
+
+    public long getCompactionRemovedEventCount() {
+        return compactionRemovedEventCount.longValue();
+    }
+
+    public long getCompactionSucceedCount() {
+        return compactionSucceedCount.longValue();
+    }
+
+    public long getCompactionFailedCount() {
+        return compactionFailedCount.longValue();
+    }
+
+    public long getCompactionDurationTimeInMills() {
+        return compactionDurationTimeInMills.longValue();
+    }
+
+    public long[] getCompactionLatencyBuckets() {
+        writeLatencyStats.refresh();
+        return writeLatencyStats.getBuckets();
+    }
+
+    public StatsBuckets getCompactionLatencyStats() {
+        return writeLatencyStats;
+    }
+
+    public double getCompactionReadThroughput() {
+        readRate.calculateRate();
+        return readRate.getValueRate();
+    }
+
+    public double getCompactionWriteThroughput() {
+        writeRate.calculateRate();
+        return writeRate.getValueRate();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
index 54ca2e8..7786e19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
 
@@ -29,29 +30,14 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability;
 public interface CompactorMXBean {
 
     /**
-     * @return the removed event count of last compaction
-     */
-    long getLastCompactionRemovedEventCount(String topic);
-
-    /**
-     * @return the timestamp of last succeed compaction
-     */
-    long getLastCompactionSucceedTimestamp(String topic);
-
-    /**
-     * @return the timestamp of last failed compaction
-     */
-    long getLastCompactionFailedTimestamp(String topic);
-
-    /**
-     * @return the duration time of last compaction
-     */
-    long getLastCompactionDurationTimeInMills(String topic);
-
-    /**
      *  Remove metrics about this topic.
      * @param topic
      */
     void removeTopic(String topic);
 
+    /**
+     *  Get the compaction record of the topic.
+     * @param topic
+     */
+    Optional<CompactionRecord> getCompactionRecordForTopic(String topic);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
index 05db2aa..7b63127 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
@@ -18,75 +18,54 @@
  */
 package org.apache.pulsar.compaction;
 
+import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.TimeUnit;
 
 public class CompactorMXBeanImpl implements CompactorMXBean {
 
-    private final ConcurrentHashMap<String, CompactRecord> compactRecordOps = new ConcurrentHashMap<>();
+    private final ConcurrentHashMap<String, CompactionRecord> compactionRecordOps = new ConcurrentHashMap<>();
 
     public void addCompactionRemovedEvent(String topic) {
-        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent();
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionRemovedEvent();
     }
 
     public void addCompactionStartOp(String topic) {
-        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset();
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionStartOp();
     }
 
     public void addCompactionEndOp(String topic, boolean succeed) {
-        CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord());
-        compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis()
-                - compactRecord.lastCompactionStartTimeOp;
-        compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue();
-        if (succeed) {
-            compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis();
-        } else {
-            compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis();
-        }
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionEndOp(succeed);
     }
 
     @Override
-    public long getLastCompactionRemovedEventCount(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount;
+    public void removeTopic(String topic) {
+        compactionRecordOps.remove(topic);
     }
 
     @Override
-    public long getLastCompactionSucceedTimestamp(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp;
+    public Optional<CompactionRecord> getCompactionRecordForTopic(String topic) {
+        return Optional.ofNullable(compactionRecordOps.get(topic));
     }
 
-    @Override
-    public long getLastCompactionFailedTimestamp(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp;
+    public Set<String> getTopics() {
+        return compactionRecordOps.keySet();
     }
 
-    @Override
-    public long getLastCompactionDurationTimeInMills(String topic) {
-        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills;
+    public void reset() {
+        compactionRecordOps.values().forEach(CompactionRecord::reset);
     }
 
-    @Override
-    public void removeTopic(String topic) {
-        compactRecordOps.remove(topic);
+    public void addCompactionReadOp(String topic, long readableBytes) {
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionReadOp(readableBytes);
     }
 
-    static class CompactRecord {
-
-        private long lastCompactionRemovedEventCount = 0L;
-        private long lastCompactionSucceedTimestamp = 0L;
-        private long lastCompactionFailedTimestamp = 0L;
-        private long lastCompactionDurationTimeInMills = 0L;
-
-        private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
-        private long lastCompactionStartTimeOp;
-
-        public void addCompactionRemovedEvent() {
-            lastCompactionRemovedEventCountOp.increment();
-        }
+    public void addCompactionWriteOp(String topic, long writeableBytes) {
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionWriteOp(writeableBytes);
+    }
 
-        public void reset() {
-            lastCompactionRemovedEventCountOp.reset();
-            lastCompactionStartTimeOp = System.currentTimeMillis();
-        }
+    public void addCompactionLatencyOp(String topic, long latency, TimeUnit unit) {
+        compactionRecordOps.computeIfAbsent(topic, k -> new CompactionRecord()).addCompactionLatencyOp(latency, unit);
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 49d121f..ded798c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -125,6 +126,7 @@ public class TwoPhaseCompactor extends Compactor {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
                 boolean replaceMessage = false;
+                mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
@@ -234,6 +236,7 @@ public class TwoPhaseCompactor extends Compactor {
             try {
                 MessageId id = m.getMessageId();
                 Optional<RawMessage> messageToAdd = Optional.empty();
+                mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         messageToAdd = RawBatchConverter.rebatchMessage(
@@ -262,7 +265,7 @@ public class TwoPhaseCompactor extends Compactor {
                     RawMessage message = messageToAdd.get();
                     try {
                         outstanding.acquire();
-                        CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message)
+                        CompletableFuture<Void> addFuture = addToCompactedLedger(lh, message, reader.getTopic())
                                 .whenComplete((res, exception2) -> {
                                     outstanding.release();
                                     if (exception2 != null) {
@@ -363,12 +366,15 @@ public class TwoPhaseCompactor extends Compactor {
         return bkf;
     }
 
-    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m) {
+    private CompletableFuture<Void> addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) {
         CompletableFuture<Void> bkf = new CompletableFuture<>();
         ByteBuf serialized = m.serialize();
         try {
+            mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes());
+            long start = System.nanoTime();
             lh.asyncAddEntry(serialized,
                     (rc, ledger, eid, ctx) -> {
+                        mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS);
                         if (rc != BKException.Code.OK) {
                             bkf.completeExceptionally(BKException.create(rc));
                         } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 7387915..3de3a04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.jsonwebtoken.SignatureAlgorithm;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -43,14 +44,18 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Random;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.crypto.SecretKey;
 import javax.naming.AuthenticationException;
 import lombok.Cleanup;
+import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.commons.io.IOUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
@@ -63,10 +68,13 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -1136,6 +1144,91 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         parseMetrics(sampleMetrics);
     }
 
+    @Test
+    public void testCompaction() throws Exception {
+        final String topicName = "persistent://my-namespace/use/my-ns/my-compaction1";
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        String metricsStr = statsOut.toString();
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
+        assertEquals(cm.size(), 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
+        assertEquals(cm.size(), 0);
+        //
+        final int numMessages = 1000;
+        final int maxKeys = 10;
+        Random r = new Random(0);
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key"+keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.newMessage()
+                    .key(key)
+                    .value(data)
+                    .send();
+        }
+        ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build());
+        Compactor compactor = pulsar.getCompactor(true);
+        compactor.compact(topicName).get();
+        statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
+        metricsStr = statsOut.toString();
+        metrics = parseMetrics(metricsStr);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_removed_event_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 990);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_succeed_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 1);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_failed_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_duration_time_in_mills");
+        assertEquals(cm.size(), 1);
+        assertTrue(cm.get(0).value > 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_read_throughput");
+        assertEquals(cm.size(), 1);
+        assertTrue(cm.get(0).value > 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_write_throughput");
+        assertEquals(cm.size(), 1);
+        assertTrue(cm.get(0).value > 0);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_count");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 10);
+        cm = (List<Metric>) metrics.get("pulsar_compaction_compacted_entries_size");
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).value, 870);
+
+        pulsarClient.close();
+    }
+
+    private void compareCompactionStateCount(List<Metric> cm, double count) {
+        assertEquals(cm.size(), 1);
+        assertEquals(cm.get(0).tags.get("cluster"), "test");
+        assertEquals(cm.get(0).tags.get("broker"), "localhost");
+        assertEquals(cm.get(0).value, count);
+    }
+
     /**
      * Hacky parsing of Prometheus text format. Should be good enough for unit tests
      */
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
index 9784928..39ecc42 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/AggregatedNamespaceStatsTest.java
@@ -43,6 +43,14 @@ public class AggregatedNamespaceStatsTest {
         topicStats1.msgBacklog = 30;
         topicStats1.managedLedgerStats.storageWriteRate = 12.0;
         topicStats1.managedLedgerStats.storageReadRate = 6.0;
+        topicStats1.compactionRemovedEventCount = 10;
+        topicStats1.compactionSucceedCount = 1;
+        topicStats1.compactionFailedCount = 2;
+        topicStats1.compactionDurationTimeInMills = 1000;
+        topicStats1.compactionReadThroughput = 15.0;
+        topicStats1.compactionWriteThroughput = 20.0;
+        topicStats1.compactionCompactedEntriesCount = 30;
+        topicStats1.compactionCompactedEntriesSize = 1000;
 
         AggregatedReplicationStats replStats1 = new AggregatedReplicationStats();
         replStats1.msgRateIn = 1.0;
@@ -75,6 +83,14 @@ public class AggregatedNamespaceStatsTest {
         topicStats2.msgBacklog = 7;
         topicStats2.managedLedgerStats.storageWriteRate = 5.0;
         topicStats2.managedLedgerStats.storageReadRate = 2.5;
+        topicStats2.compactionRemovedEventCount = 10;
+        topicStats2.compactionSucceedCount = 1;
+        topicStats2.compactionFailedCount = 2;
+        topicStats2.compactionDurationTimeInMills = 1000;
+        topicStats2.compactionReadThroughput = 15.0;
+        topicStats2.compactionWriteThroughput = 20.0;
+        topicStats2.compactionCompactedEntriesCount = 30;
+        topicStats2.compactionCompactedEntriesSize = 1000;
 
         AggregatedReplicationStats replStats2 = new AggregatedReplicationStats();
         replStats2.msgRateIn = 3.5;
@@ -113,6 +129,15 @@ public class AggregatedNamespaceStatsTest {
         assertEquals(nsStats.managedLedgerStats.storageSize, 6144);
         assertEquals(nsStats.managedLedgerStats.storageLogicalSize, 2560);
 
+        assertEquals(nsStats.compactionRemovedEventCount, 20);
+        assertEquals(nsStats.compactionSucceedCount, 2);
+        assertEquals(nsStats.compactionFailedCount, 4);
+        assertEquals(nsStats.compactionDurationTimeInMills, 2000);
+        assertEquals(nsStats.compactionReadThroughput, 30.0);
+        assertEquals(nsStats.compactionWriteThroughput, 40.0);
+        assertEquals(nsStats.compactionCompactedEntriesCount, 60);
+        assertEquals(nsStats.compactionCompactedEntriesSize, 2000);
+
         AggregatedReplicationStats nsReplStats = nsStats.replicationStats.get(namespace);
         assertNotNull(nsReplStats);
         assertEquals(nsReplStats.msgRateIn, 4.5);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
index b865396..a3f2cae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
@@ -18,12 +18,9 @@
  */
 package org.apache.pulsar.compaction;
 
-import org.apache.pulsar.broker.service.BrokerService;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
-import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -36,15 +33,38 @@ public class CompactorMXBeanImplTest {
         CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl();
         String topic = "topic1";
         mxBean.addCompactionStartOp(topic);
-        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        CompactionRecord compaction = mxBean.getCompactionRecordForTopic(topic).get();
+        assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0);
         mxBean.addCompactionRemovedEvent(topic);
-        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        assertEquals(compaction.getLastCompactionRemovedEventCount(), 0, 0);
         mxBean.addCompactionEndOp(topic, true);
+        assertEquals(compaction.getLastCompactionRemovedEventCount(), 1, 0);
+        assertTrue(compaction.getLastCompactionSucceedTimestamp() > 0L);
+        assertTrue(compaction.getLastCompactionDurationTimeInMills() >= 0L);
+        assertEquals(compaction.getLastCompactionFailedTimestamp(), 0, 0);
+        assertEquals(compaction.getCompactionRemovedEventCount(), 1, 0);
+        assertEquals(compaction.getCompactionSucceedCount(), 1, 0);
+        assertEquals(compaction.getCompactionFailedCount(), 0, 0);
+        assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L);
+        mxBean.addCompactionStartOp(topic);
+        mxBean.addCompactionRemovedEvent(topic);
         mxBean.addCompactionEndOp(topic, false);
-        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0);
-        assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L);
-        assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L);
-        assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L);
+        assertEquals(compaction.getCompactionRemovedEventCount(), 2, 0);
+        assertEquals(compaction.getCompactionFailedCount(), 1, 0);
+        assertEquals(compaction.getCompactionSucceedCount(), 1, 0);
+        assertTrue(compaction.getLastCompactionFailedTimestamp() > 0L);
+        assertTrue(compaction.getCompactionDurationTimeInMills() >= 0L);
+        mxBean.addCompactionReadOp(topic, 22);
+        assertTrue(compaction.getCompactionReadThroughput() > 0L);
+        mxBean.addCompactionWriteOp(topic, 33);
+        assertTrue(compaction.getCompactionWriteThroughput() > 0L);
+        mxBean.addCompactionLatencyOp(topic, 10, TimeUnit.NANOSECONDS);
+        assertTrue(compaction.getCompactionLatencyBuckets()[0] > 0l);
+        mxBean.reset();
+        assertEquals(compaction.getCompactionRemovedEventCount(), 0, 0);
+        assertEquals(compaction.getCompactionSucceedCount(), 0, 0);
+        assertEquals(compaction.getCompactionFailedCount(), 0, 0);
+        assertEquals(compaction.getCompactionDurationTimeInMills(), 0, 0);
     }
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 8197c4a..3381aee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -113,10 +113,11 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
             m.close();
         }
         if (checkMetrics) {
-            long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic);
-            long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic);
-            long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic);
-            long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic);
+            CompactionRecord compactionRecord = compactor.getStats().getCompactionRecordForTopic(topic).get();
+            long compactedTopicRemovedEventCount = compactionRecord.getLastCompactionRemovedEventCount();
+            long lastCompactSucceedTimestamp = compactionRecord.getLastCompactionSucceedTimestamp();
+            long lastCompactFailedTimestamp = compactionRecord.getLastCompactionFailedTimestamp();
+            long lastCompactDurationTimeInMills = compactionRecord.getLastCompactionDurationTimeInMills();
             Assert.assertTrue(compactedTopicRemovedEventCount >= 1);
             Assert.assertTrue(lastCompactSucceedTimestamp >= 1L);
             Assert.assertTrue(lastCompactDurationTimeInMills >= 0L);
diff --git a/site2/docs/reference-metrics.md b/site2/docs/reference-metrics.md
index 839029b..3124e18 100644
--- a/site2/docs/reference-metrics.md
+++ b/site2/docs/reference-metrics.md
@@ -204,6 +204,16 @@ All the topic metrics are labelled with the following labels:
 | pulsar_in_messages_total | Counter | The total number of messages received for this topic |
 | pulsar_out_bytes_total | Counter | The total number of bytes read from this topic |
 | pulsar_out_messages_total | Counter | The total number of messages read from this topic |
+| pulsar_compaction_removed_event_count | Gauge | The removed event count of compaction |
+| pulsar_compaction_succeed_count | Gauge | The succeed count of compaction |
+| pulsar_compaction_failed_count | Gauge | The failed count of compaction |
+| pulsar_compaction_duration_time_in_mills | Gauge | The duration time of compaction |
+| pulsar_compaction_read_throughput | Gauge | The read throughput of compaction |
+| pulsar_compaction_write_throughput | Gauge | The write throughput of compaction |
+| pulsar_compaction_latency_le_* | Histogram | The compaction latency with given quantile. <br> Available thresholds: <br><ul><li>pulsar_compaction_latency_le_0_5: <= 0.5ms </li><li>pulsar_compaction_latency_le_1: <= 1ms</li><li>pulsar_compaction_latency_le_5: <= 5ms</li><li>pulsar_compaction_latency_le_10: <= 10ms</li><li>pulsar_compaction_latency_le_20: <= 20ms</li><li>pulsar_compaction_latency_le_50: <= 50ms</li><li>pulsar_compaction_latency_le_100: <= 100ms</li><li>pulsar_compaction_ [...]
+| pulsar_compaction_compacted_entries_count | Gauge | The compacted entries count |
+| pulsar_compaction_compacted_entries_size |Gauge  | The compacted entries size |
+
 
 #### Replication metrics
 

[pulsar] 01/05: [python-client] Fixed crash when using Python logger (#10981)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 875bfdfc879bdef09be3c6cebeaa43fdce192ebb
Author: Livio BenĨik <lb...@gmail.com>
AuthorDate: Wed Jul 14 09:02:34 2021 +0200

    [python-client] Fixed crash when using Python logger (#10981)
    
    ### Motivation
    
    In some cases, the Python client would crash when using the new `logger` option. This happens when a Pulsar message is sent asynchronously, but soon after the program exits (and even then, not always).
    
    For example, when doing Django migrations which include sending a message:
    ```
    ...
    [2021-06-19 06:53:57.691] [INFO]: Created connection for pulsar://localhost:6650
    [2021-06-19 06:53:57.693] [INFO]: [127.0.0.1:36536 -> 127.0.0.1:6650] Connected to broker
    [2021-06-19 06:53:57.695] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool
    [2021-06-19 06:53:57.707] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:36536 -> 127.0.0.1:6650]
    ...
    [2021-06-19 06:53:57.728] [DEBUG]: Sending message to topic .....
      Applying dashboard.0001_initial... OK
      Applying templating.0001_initial... OK
    Error in sys.excepthook:
    
    Original exception was:
    Failed to migrate dashboard! Return code was: -6
    ```
    
    This happens because Pulsar tries to log messages after Python already started finalizing, so the client can't get a GIL lock, which crashes the whole client.
    
    ### Modifications
    
    Following the instructions at https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure, I added a check for when Python is finalizing, and if it is, we fallback to the default console logger (the log level is still respected correctly).
    
    Now it looks like this:
    ```
    ...
    [2021-06-19 06:45:15.561] [INFO]: Created connection for pulsar://localhost:6650
    [2021-06-19 06:45:15.563] [INFO]: [127.0.0.1:35930 -> 127.0.0.1:6650] Connected to broker
    [2021-06-19 06:45:15.568] [INFO]: [persistent://public/default/dashboard-global_context-emit, ] Getting connection from pool
    [2021-06-19 06:45:15.586] [INFO]: [persistent://public/default/zaba-dashboard-global_context-emit, ] Created producer on broker [127.0.0.1:35930 -> 127.0.0.1:6650]
    ...
    [2021-06-19 06:45:15.604] [DEBUG]: Sending message to topic .....
      Applying dashboard.0001_initial... OK
      Applying templating.0001_initial... OK
    2021-06-19 06:45:16.200 INFO  [139853253269312] ClientConnection:1446 | [127.0.0.1:35930 -> 127.0.0.1:6650] Connection closed
    2021-06-19 06:45:16.200 ERROR [139853099652672] ClientConnection:531 | [127.0.0.1:35930 -> 127.0.0.1:6650] Read failed: Operation canceled
    2021-06-19 06:45:16.201 INFO  [139853253269312] ClientConnection:261 | [127.0.0.1:35930 -> 127.0.0.1:6650] Destroyed connection
    2021-06-19 06:45:16.201 INFO  [139853253269312] ProducerImpl:561 | Producer - [persistent://public/default/dashboard-global_context-emit, standalone-0-120] , [batchMessageContainer = { BatchMessageContainer [size = 0] [bytes = 0] [maxSize = 1000] [maxBytes = 131072] [topicName = persistent://public/default/dashboard-global_context-emit] [numberOfBatchesSent_ = 1] [averageBatchSize_ = 1] }]
    Successfully migrated dashboard
    ```
    
    (cherry picked from commit fc8ce64b1328945ab8e06aad56151294295f003a)
---
 pulsar-client-cpp/python/src/config.cc | 65 +++++++++++++++++-----------------
 1 file changed, 32 insertions(+), 33 deletions(-)

diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index b665ec7..0b30713 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "utils.h"
+#include <pulsar/ConsoleLoggerFactory.h>
 
 template<typename T>
 struct ListenerWrapper {
@@ -90,6 +91,7 @@ static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerC
 
 class LoggerWrapper: public Logger {
     PyObject* _pyLogger;
+    Logger* fallbackLogger;
     int _currentPythonLogLevel = _getLogLevelValue(Logger::LEVEL_INFO);
 
     void _updateCurrentPythonLogLevel() {
@@ -110,26 +112,19 @@ class LoggerWrapper: public Logger {
 
    public:
 
-    LoggerWrapper(const std::string &logger, PyObject* pyLogger) {
+    LoggerWrapper(const std::string &filename, PyObject* pyLogger) {
         _pyLogger = pyLogger;
         Py_XINCREF(_pyLogger);
 
-        _updateCurrentPythonLogLevel();
-    }
-
-    LoggerWrapper(const LoggerWrapper& other) {
-        _pyLogger = other._pyLogger;
-        Py_XINCREF(_pyLogger);
-    }
+        std::unique_ptr<LoggerFactory> factory(new ConsoleLoggerFactory());
+        fallbackLogger = factory->getLogger(filename);
 
-    LoggerWrapper& operator=(const LoggerWrapper& other) {
-        _pyLogger = other._pyLogger;
-        Py_XINCREF(_pyLogger);
-        return *this;
+        _updateCurrentPythonLogLevel();
     }
 
     virtual ~LoggerWrapper() {
         Py_XDECREF(_pyLogger);
+        delete fallbackLogger;
     }
 
     bool isEnabled(Level level) {
@@ -137,34 +132,38 @@ class LoggerWrapper: public Logger {
     }
 
     void log(Level level, int line, const std::string& message) {
-        PyGILState_STATE state = PyGILState_Ensure();
-
-        try {
-            switch (level) {
-                case Logger::LEVEL_DEBUG:
-                    py::call_method<void>(_pyLogger, "debug", message.c_str());
-                    break;
-                case Logger::LEVEL_INFO:
-                    py::call_method<void>(_pyLogger, "info", message.c_str());
-                    break;
-                case Logger::LEVEL_WARN:
-                    py::call_method<void>(_pyLogger, "warning", message.c_str());
-                    break;
-                case Logger::LEVEL_ERROR:
-                    py::call_method<void>(_pyLogger, "error", message.c_str());
-                    break;
+        if (Py_IsInitialized() != true) {
+            // Python logger is unavailable - fallback to console logger
+            fallbackLogger->log(level, line, message);
+        } else {
+            PyGILState_STATE state = PyGILState_Ensure();
+
+            try {
+                switch (level) {
+                    case Logger::LEVEL_DEBUG:
+                        py::call_method<void>(_pyLogger, "debug", message.c_str());
+                        break;
+                    case Logger::LEVEL_INFO:
+                        py::call_method<void>(_pyLogger, "info", message.c_str());
+                        break;
+                    case Logger::LEVEL_WARN:
+                        py::call_method<void>(_pyLogger, "warning", message.c_str());
+                        break;
+                    case Logger::LEVEL_ERROR:
+                        py::call_method<void>(_pyLogger, "error", message.c_str());
+                        break;
+                }
+
+            } catch (py::error_already_set e) {
+                PyErr_Print();
             }
 
-        } catch (py::error_already_set e) {
-            PyErr_Print();
+            PyGILState_Release(state);
         }
-
-        PyGILState_Release(state);
     }
 };
 
 class LoggerWrapperFactory : public LoggerFactory {
-    static LoggerWrapperFactory* _instance;
     PyObject* _pyLogger;
 
    public:

[pulsar] 02/05: Add compacted topic metrics for TopicStats in CLI (#11564)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 71cc13be016646da47ba7298c585ae1f484507a5
Author: GuoJiwei <te...@apache.org>
AuthorDate: Sat Aug 21 22:58:20 2021 +0800

    Add compacted topic metrics for TopicStats in CLI (#11564)
    
    Add below metrics to help track potential flows or examine the overall condition of compacted topics .
    - lastCompactionRemovedEventCount : the removed event count of last compaction
    - lastCompactionSucceedTimestamp : the timestamp of last succeed compaction
    - lastCompactionFailedTimestamp : the timestamp of last failed compaction
    - lastCompactionDurationTimeInMills: the duration time of last compaction
    
    These 4 metrics will be displayed in topic stats CLI :
    ```
    ./pulsar-admin topics stats persistent://tenant/ns/topic
    ```
    
    This patch will add metrics in CLI , which would generate doc automatically.
    
    (cherry picked from commit c0ef593990ce8a7ea9ee6f1def880f71def3fc97)
---
 .../org/apache/pulsar/broker/PulsarService.java    | 14 ++--
 .../pulsar/broker/service/BrokerService.java       | 13 ++-
 .../broker/service/persistent/PersistentTopic.java | 22 ++++++
 .../org/apache/pulsar/compaction/Compactor.java    | 29 ++++---
 .../apache/pulsar/compaction/CompactorMXBean.java  | 57 ++++++++++++++
 .../pulsar/compaction/CompactorMXBeanImpl.java     | 92 ++++++++++++++++++++++
 .../pulsar/compaction/TwoPhaseCompactor.java       | 14 +++-
 .../pulsar/compaction/CompactorMXBeanImplTest.java | 50 ++++++++++++
 .../apache/pulsar/compaction/CompactorTest.java    | 20 +++--
 .../common/policies/data/CompactionStats.java      | 37 +++++++++
 .../pulsar/common/policies/data/TopicStats.java    |  3 +
 .../policies/data/stats/CompactionStatsImpl.java   | 47 +++++++++++
 .../common/policies/data/stats/TopicStatsImpl.java |  6 +-
 .../policies/data/PersistentTopicStatsTest.java    |  4 +
 14 files changed, 383 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f7a1337..c09ec45 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1313,12 +1313,16 @@ public class PulsarService implements AutoCloseable {
     // only public so mockito can mock it
     public Compactor newCompactor() throws PulsarServerException {
         return new TwoPhaseCompactor(this.getConfiguration(),
-                                     getClient(), getBookKeeperClient(),
-                                     getCompactorExecutor());
+                getClient(), getBookKeeperClient(),
+                getCompactorExecutor());
     }
 
     public synchronized Compactor getCompactor() throws PulsarServerException {
-        if (this.compactor == null) {
+        return getCompactor(true);
+    }
+
+    public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException {
+        if (this.compactor == null && shouldInitialize) {
             this.compactor = newCompactor();
         }
         return this.compactor;
@@ -1327,8 +1331,8 @@ public class PulsarService implements AutoCloseable {
     protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) {
         if (this.offloaderScheduler == null) {
             this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder()
-                .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
-                .name("offloader").build();
+                    .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads())
+                    .name("offloader").build();
         }
         return this.offloaderScheduler;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ce9abcb..74fde84 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -164,6 +164,10 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.common.util.netty.NettyFutureUtil;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
@@ -1770,8 +1774,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 }
             }
         }
-
         topics.remove(topic);
+
+        try {
+            Compactor compactor = pulsar.getCompactor(false);
+            if (compactor != null) {
+                compactor.getStats().removeTopic(topic);
+            }
+        } catch (PulsarServerException ignore) {
+        }
     }
 
     public int getNumberOfNamespaceBundles() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2808021..b017750 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -150,6 +150,8 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
@@ -1899,9 +1901,29 @@ public class PersistentTopic extends AbstractTopic
         stats.lastOffloadLedgerId = ledger.getLastOffloadedLedgerId();
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
+        Optional<CompactorMXBean> mxBean = getCompactorMXBean();
+        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
+                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
+        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
+                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
+        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
+                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
+        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
+                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
+
         return stats;
     }
 
+    private Optional<CompactorMXBean> getCompactorMXBean() {
+        Compactor compactor = null;
+        try {
+            compactor = brokerService.pulsar().getCompactor(false);
+        } catch (PulsarServerException ex) {
+            log.warn("get compactor error", ex);
+        }
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
+    }
+
     @Override
     public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
index 61032d6..cb631e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java
@@ -42,6 +42,7 @@ public abstract class Compactor {
     protected final ScheduledExecutorService scheduler;
     private final PulsarClient pulsar;
     private final BookKeeper bk;
+    protected final CompactorMXBeanImpl mxBean;
 
     public Compactor(ServiceConfiguration conf,
                      PulsarClient pulsar,
@@ -51,6 +52,7 @@ public abstract class Compactor {
         this.scheduler = scheduler;
         this.pulsar = pulsar;
         this.bk = bk;
+        this.mxBean = new CompactorMXBeanImpl();
     }
 
     public CompletableFuture<Long> compact(String topic) {
@@ -60,23 +62,30 @@ public abstract class Compactor {
 
     private CompletableFuture<Long> compactAndCloseReader(RawReader reader) {
         CompletableFuture<Long> promise = new CompletableFuture<>();
+        mxBean.addCompactionStartOp(reader.getTopic());
         doCompaction(reader, bk).whenComplete(
                 (ledgerId, exception) -> {
                     reader.closeAsync().whenComplete((v, exception2) -> {
-                            if (exception2 != null) {
-                                log.warn("Error closing reader handle {}, ignoring", reader, exception2);
-                            }
-                            if (exception != null) {
-                                // complete with original exception
-                                promise.completeExceptionally(exception);
-                            } else {
-                                promise.complete(ledgerId);
-                            }
-                        });
+                        if (exception2 != null) {
+                            log.warn("Error closing reader handle {}, ignoring", reader, exception2);
+                        }
+                        if (exception != null) {
+                            // complete with original exception
+                            mxBean.addCompactionEndOp(reader.getTopic(), false);
+                            promise.completeExceptionally(exception);
+                        } else {
+                            mxBean.addCompactionEndOp(reader.getTopic(), true);
+                            promise.complete(ledgerId);
+                        }
+                    });
                 });
         return promise;
     }
 
     protected abstract CompletableFuture<Long> doCompaction(RawReader reader, BookKeeper bk);
+
+    public CompactorMXBean getStats() {
+        return this.mxBean;
+    }
 }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
new file mode 100644
index 0000000..54ca2e8
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBean.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.bookkeeper.common.annotation.InterfaceAudience;
+import org.apache.bookkeeper.common.annotation.InterfaceStability;
+
+/**
+ * JMX Bean interface for Compactor stats.
+ */
+@InterfaceAudience.LimitedPrivate
+@InterfaceStability.Stable
+public interface CompactorMXBean {
+
+    /**
+     * @return the removed event count of last compaction
+     */
+    long getLastCompactionRemovedEventCount(String topic);
+
+    /**
+     * @return the timestamp of last succeed compaction
+     */
+    long getLastCompactionSucceedTimestamp(String topic);
+
+    /**
+     * @return the timestamp of last failed compaction
+     */
+    long getLastCompactionFailedTimestamp(String topic);
+
+    /**
+     * @return the duration time of last compaction
+     */
+    long getLastCompactionDurationTimeInMills(String topic);
+
+    /**
+     *  Remove metrics about this topic.
+     * @param topic
+     */
+    void removeTopic(String topic);
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
new file mode 100644
index 0000000..05db2aa
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorMXBeanImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+public class CompactorMXBeanImpl implements CompactorMXBean {
+
+    private final ConcurrentHashMap<String, CompactRecord> compactRecordOps = new ConcurrentHashMap<>();
+
+    public void addCompactionRemovedEvent(String topic) {
+        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).addCompactionRemovedEvent();
+    }
+
+    public void addCompactionStartOp(String topic) {
+        compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord()).reset();
+    }
+
+    public void addCompactionEndOp(String topic, boolean succeed) {
+        CompactRecord compactRecord = compactRecordOps.computeIfAbsent(topic, k -> new CompactRecord());
+        compactRecord.lastCompactionDurationTimeInMills = System.currentTimeMillis()
+                - compactRecord.lastCompactionStartTimeOp;
+        compactRecord.lastCompactionRemovedEventCount = compactRecord.lastCompactionRemovedEventCountOp.longValue();
+        if (succeed) {
+            compactRecord.lastCompactionSucceedTimestamp = System.currentTimeMillis();
+        } else {
+            compactRecord.lastCompactionFailedTimestamp = System.currentTimeMillis();
+        }
+    }
+
+    @Override
+    public long getLastCompactionRemovedEventCount(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionRemovedEventCount;
+    }
+
+    @Override
+    public long getLastCompactionSucceedTimestamp(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionSucceedTimestamp;
+    }
+
+    @Override
+    public long getLastCompactionFailedTimestamp(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionFailedTimestamp;
+    }
+
+    @Override
+    public long getLastCompactionDurationTimeInMills(String topic) {
+        return compactRecordOps.getOrDefault(topic, new CompactRecord()).lastCompactionDurationTimeInMills;
+    }
+
+    @Override
+    public void removeTopic(String topic) {
+        compactRecordOps.remove(topic);
+    }
+
+    static class CompactRecord {
+
+        private long lastCompactionRemovedEventCount = 0L;
+        private long lastCompactionSucceedTimestamp = 0L;
+        private long lastCompactionFailedTimestamp = 0L;
+        private long lastCompactionDurationTimeInMills = 0L;
+
+        private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
+        private long lastCompactionStartTimeOp;
+
+        public void addCompactionRemovedEvent() {
+            lastCompactionRemovedEventCountOp.increment();
+        }
+
+        public void reset() {
+            lastCompactionRemovedEventCountOp.reset();
+            lastCompactionStartTimeOp = System.currentTimeMillis();
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 0f0f981..49d121f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -124,18 +124,23 @@ public class TwoPhaseCompactor extends Compactor {
             try {
                 MessageId id = m.getMessageId();
                 boolean deletedMessage = false;
+                boolean replaceMessage = false;
                 if (RawBatchConverter.isReadableBatch(m)) {
                     try {
                         for (ImmutableTriple<MessageId, String, Integer> e : RawBatchConverter
                                 .extractIdsAndKeysAndSize(m)) {
                             if (e != null) {
                                 if (e.getRight() > 0) {
-                                    latestForKey.put(e.getMiddle(), e.getLeft());
+                                    MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
+                                    replaceMessage = old != null;
                                 } else {
                                     deletedMessage = true;
                                     latestForKey.remove(e.getMiddle());
                                 }
                             }
+                            if (replaceMessage || deletedMessage) {
+                                mxBean.addCompactionRemovedEvent(reader.getTopic());
+                            }
                         }
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole batch will be included in output",
@@ -145,14 +150,17 @@ public class TwoPhaseCompactor extends Compactor {
                     Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
                     if (keyAndSize != null) {
                         if (keyAndSize.getRight() > 0) {
-                            latestForKey.put(keyAndSize.getLeft(), id);
+                            MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
+                            replaceMessage = old != null;
                         } else {
                             deletedMessage = true;
                             latestForKey.remove(keyAndSize.getLeft());
                         }
                     }
+                    if (replaceMessage || deletedMessage) {
+                        mxBean.addCompactionRemovedEvent(reader.getTopic());
+                    }
                 }
-
                 MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
                 MessageId to = deletedMessage ? toMessageId.orElse(null) : id;
                 if (id.compareTo(lastMessageId) == 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
new file mode 100644
index 0000000..b865396
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorMXBeanImplTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.compaction;
+
+import org.apache.pulsar.broker.service.BrokerService;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Test(groups = "broker-compaction")
+public class CompactorMXBeanImplTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        CompactorMXBeanImpl mxBean = new CompactorMXBeanImpl();
+        String topic = "topic1";
+        mxBean.addCompactionStartOp(topic);
+        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        mxBean.addCompactionRemovedEvent(topic);
+        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 0, 0);
+        mxBean.addCompactionEndOp(topic, true);
+        mxBean.addCompactionEndOp(topic, false);
+        assertEquals(mxBean.getLastCompactionRemovedEventCount(topic), 1, 0);
+        assertTrue(mxBean.getLastCompactionSucceedTimestamp(topic) > 0L);
+        assertTrue(mxBean.getLastCompactionFailedTimestamp(topic) > 0L);
+        assertTrue(mxBean.getLastCompactionDurationTimeInMills(topic) >= 0L);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index 0d1a95c..8197c4a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -84,7 +84,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         compactionScheduler.shutdownNow();
     }
 
-    private List<String> compactAndVerify(String topic, Map<String, byte[]> expected) throws Exception {
+    private List<String> compactAndVerify(String topic, Map<String, byte[]> expected, boolean checkMetrics) throws Exception {
         BookKeeper bk = pulsar.getBookKeeperClientFactory().create(
                 this.conf, null, null, Optional.empty(), null);
         Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
@@ -112,6 +112,16 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
                                 "Compacted version should match expected version");
             m.close();
         }
+        if (checkMetrics) {
+            long compactedTopicRemovedEventCount = compactor.getStats().getLastCompactionRemovedEventCount(topic);
+            long lastCompactSucceedTimestamp = compactor.getStats().getLastCompactionSucceedTimestamp(topic);
+            long lastCompactFailedTimestamp = compactor.getStats().getLastCompactionFailedTimestamp(topic);
+            long lastCompactDurationTimeInMills = compactor.getStats().getLastCompactionDurationTimeInMills(topic);
+            Assert.assertTrue(compactedTopicRemovedEventCount >= 1);
+            Assert.assertTrue(lastCompactSucceedTimestamp >= 1L);
+            Assert.assertTrue(lastCompactDurationTimeInMills >= 0L);
+            Assert.assertEquals(lastCompactFailedTimestamp, 0L);
+        }
         Assert.assertTrue(expected.isEmpty(), "All expected keys should have been found");
         return keys;
     }
@@ -140,7 +150,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
                     .send();
             expected.put(key, data);
         }
-        compactAndVerify(topic, expected);
+        compactAndVerify(topic, expected, true);
     }
 
     @Test
@@ -169,7 +179,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         expected.put("a", "A_2".getBytes());
         expected.put("b", "B_1".getBytes());
 
-        compactAndVerify(topic, new HashMap<>(expected));
+        compactAndVerify(topic, new HashMap<>(expected), false);
 
         producer.newMessage()
                 .key("b")
@@ -177,7 +187,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
                 .send();
         expected.put("b", "B_2".getBytes());
 
-        compactAndVerify(topic, expected);
+        compactAndVerify(topic, expected, false);
     }
 
     @Test
@@ -206,7 +216,7 @@ public class CompactorTest extends MockedPulsarServiceBaseTest {
         expected.put("b", "B_1".getBytes());
         expected.put("c", "C_1".getBytes());
 
-        List<String> keyOrder = compactAndVerify(topic, expected);
+        List<String> keyOrder = compactAndVerify(topic, expected, false);
 
         Assert.assertEquals(keyOrder, Lists.newArrayList("c", "b", "a"));
     }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java
new file mode 100644
index 0000000..0500017
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/CompactionStats.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Statistics about compaction.
+ */
+public interface CompactionStats {
+
+    /** The removed event count of last compaction. */
+    long getLastCompactionRemovedEventCount();
+
+    /** The timestamp of last succeed compaction. */
+    long getLastCompactionSucceedTimestamp();
+
+    /** The timestamp of last failed compaction. */
+    long getLastCompactionFailedTimestamp();
+
+    /** The duration time of last compaction. */
+    long getLastCompactionDurationTimeInMills();
+}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index afc1810..dc1964e 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -85,4 +85,7 @@ public interface TopicStats {
 
     /** The serialized size of non-contiguous deleted messages ranges. */
     int getNonContiguousDeletedMessagesRangesSerializedSize();
+
+    /** The compaction stats. */
+    CompactionStats getCompaction();
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java
new file mode 100644
index 0000000..e187f6a
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/CompactionStatsImpl.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data.stats;
+
+import lombok.Data;
+import org.apache.pulsar.common.policies.data.CompactionStats;
+/**
+ * Statistics about compaction.
+ */
+@Data
+public class CompactionStatsImpl implements CompactionStats {
+
+    /** The removed event count of last compaction. */
+    public long lastCompactionRemovedEventCount;
+
+    /** The timestamp of last succeed compaction. */
+    public long lastCompactionSucceedTimestamp;
+
+    /** The timestamp of last failed compaction. */
+    public long lastCompactionFailedTimestamp;
+
+    /** The duration time of last compaction. */
+    public long lastCompactionDurationTimeInMills;
+
+    public void reset() {
+        this.lastCompactionRemovedEventCount = 0;
+        this.lastCompactionSucceedTimestamp = 0;
+        this.lastCompactionFailedTimestamp = 0;
+        this.lastCompactionDurationTimeInMills = 0;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
index 7ad8e83..f3b3944 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java
@@ -30,7 +30,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.TreeMap;
 
 /**
@@ -114,6 +113,9 @@ public class TopicStatsImpl implements TopicStats {
     /** The serialized size of non-contiguous deleted messages ranges. */
     public int nonContiguousDeletedMessagesRangesSerializedSize;
 
+    /** The compaction stats */
+    public CompactionStatsImpl compaction;
+
     public List<? extends PublisherStats> getPublishers() {
         return publishers;
     }
@@ -130,6 +132,7 @@ public class TopicStatsImpl implements TopicStats {
         this.publishers = new ArrayList<>();
         this.subscriptions = new HashMap<>();
         this.replication = new TreeMap<>();
+        this.compaction = new CompactionStatsImpl();
     }
 
     public void reset() {
@@ -157,6 +160,7 @@ public class TopicStatsImpl implements TopicStats {
         this.lastOffloadLedgerId = 0;
         this.lastOffloadFailureTimeStamp = 0;
         this.lastOffloadSuccessTimeStamp = 0;
+        this.compaction.reset();
     }
 
     // if the stats are added for the 1st time, we will need to make a copy of these stats and add it to the current
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
index fa67fb0..9c4c3b2 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/PersistentTopicStatsTest.java
@@ -60,6 +60,10 @@ public class PersistentTopicStatsTest {
         assertEquals(topicStats.publishers.size(), 1);
         assertEquals(topicStats.subscriptions.size(), 1);
         assertEquals(topicStats.replication.size(), 1);
+        assertEquals(topicStats.compaction.lastCompactionRemovedEventCount, 0);
+        assertEquals(topicStats.compaction.lastCompactionSucceedTimestamp, 0);
+        assertEquals(topicStats.compaction.lastCompactionFailedTimestamp, 0);
+        assertEquals(topicStats.compaction.lastCompactionDurationTimeInMills, 0);
         topicStats.reset();
         assertEquals(topicStats.msgRateIn, 0.0);
         assertEquals(topicStats.msgThroughputIn, 0.0);

[pulsar] 05/05: Fix the checkstyle issue during the cherry-pick

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 19b2b69983ed4ac141fe037dfbbc43ac50267fca
Author: penghui <pe...@apache.org>
AuthorDate: Thu Sep 2 11:04:02 2021 +0800

    Fix the checkstyle issue during the cherry-pick
---
 .../src/main/java/org/apache/pulsar/broker/service/BrokerService.java  | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 74fde84..2ebaee2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -165,9 +165,6 @@ import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.common.util.netty.NettyFutureUtil;
 import org.apache.pulsar.compaction.Compactor;
-import org.apache.pulsar.metadata.api.MetadataCache;
-import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;