You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/08/22 01:02:18 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request #11739: Expose compaction metrics to Prometheus

Technoboy- opened a new pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739


   ### Motivation
   As #11564 has involved compaction metrics in CLI, it's extremely useful to expose relative metrics to Prometheus.
   `brk_cp_xx` cp is short for compaction.
   
   - brk_cp_totalCompactionCount : the count of total compaction.
   - brk_cp_totalCompactionRemovedEventCount :  the removed event count of total compaction .
   - brk_cp_totalCompactionSucceedCount : the succeed count of total compaction.
   - brk_cp_totalCompactionFailedCount : the failed count of total compaction.
   
   The above metrics are namespace level.
   
   ### Modifications
   
   - Add CompactionMetrics
   
   ### Documentation
   
   #### For contributor
   
   The document is required.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-907157638


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-907098798


   > @Technoboy- it could also be convenient to allow users to aggregate these metrics at a namespace level, but not a topic level. This would especially be helpful for users with many compacted topics.
   
   Yes, the current CompactorMetrics is based on namespace level.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-908821889






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r695747911



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2120,6 +2115,15 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
         return statFuture;
     }
 
+    public Optional<CompactedTopicImpl.CompactedTopicContext> getCompactedTopicContext() {

Review comment:
       Yes.  I have made  CompactedTopicContext as a public class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r698556954



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1999,6 +1999,11 @@
             doc = "If true, export managed cursor metrics"
     )
     private boolean exposeManagedCursorMetricsInPrometheus = false;
+    @FieldContext(
+            category = CATEGORY_METRICS,
+            doc = "If true, export compaction metrics"
+    )
+    private boolean exposeCompactionMetricsInPrometheus = false;

Review comment:
       Ok, updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-906013201


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r697781074



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
##########
@@ -242,24 +238,11 @@ protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
      */
     protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
             ManagedLedgerImpl ledger) {
-        if (!ledgersByDimensionMap.containsKey(metrics)) {
-            // create new list
-            ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
-        } else {
-            // add to collection
-            ledgersByDimensionMap.get(metrics).add(ledger);
-        }
+        ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(ledger)).add(ledger);
     }
 
     protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
             Metrics metrics, TopicStats destStats) {
-        if (!topicsStatsByDimensionMap.containsKey(metrics)) {
-            // create new list
-            topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
-        } else {
-            // add to collection
-            topicsStatsByDimensionMap.get(metrics).add(destStats);
-        }
-
+        topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(destStats)).add(destStats);

Review comment:
       Done.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -159,10 +161,6 @@ public CompactionMetrics(PulsarService pulsar) {
     }
 
     private void populateDimensionMap(Map<Metrics, List<String>> topicsByDimensionMap, Metrics metrics, String topic) {
-        if (!topicsByDimensionMap.containsKey(metrics)) {
-            topicsByDimensionMap.put(metrics, Lists.newArrayList(topic));
-        } else {
-            topicsByDimensionMap.get(metrics).add(topic);
-        }
+        topicsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(topic)).add(topic);

Review comment:
       Yes, thanks. 

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
##########
@@ -242,24 +238,11 @@ protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
      */
     protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
             ManagedLedgerImpl ledger) {
-        if (!ledgersByDimensionMap.containsKey(metrics)) {
-            // create new list
-            ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
-        } else {
-            // add to collection
-            ledgersByDimensionMap.get(metrics).add(ledger);
-        }
+        ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(ledger)).add(ledger);

Review comment:
       Done

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
##########
@@ -214,11 +214,7 @@ protected Metrics createMetricsByDimension(String namespace, String fromClusterN
     }
 
     protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
-        if (!map.containsKey(mkey)) {
-            map.put(mkey, Lists.newArrayList(value));
-        } else {
-            map.get(mkey).add(value);
-        }
+        map.computeIfAbsent(mkey, __ -> Lists.newArrayList(value)).add(value);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r697509325



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
##########
@@ -242,24 +238,11 @@ protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
      */
     protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
             ManagedLedgerImpl ledger) {
-        if (!ledgersByDimensionMap.containsKey(metrics)) {
-            // create new list
-            ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
-        } else {
-            // add to collection
-            ledgersByDimensionMap.get(metrics).add(ledger);
-        }
+        ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(ledger)).add(ledger);

Review comment:
       Great catch. Same comment about needing an empty array list to start.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -159,10 +161,6 @@ public CompactionMetrics(PulsarService pulsar) {
     }
 
     private void populateDimensionMap(Map<Metrics, List<String>> topicsByDimensionMap, Metrics metrics, String topic) {
-        if (!topicsByDimensionMap.containsKey(metrics)) {
-            topicsByDimensionMap.put(metrics, Lists.newArrayList(topic));
-        } else {
-            topicsByDimensionMap.get(metrics).add(topic);
-        }
+        topicsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(topic)).add(topic);

Review comment:
       Same comment about needing an empty array list to start.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
##########
@@ -242,24 +238,11 @@ protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
      */
     protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
             ManagedLedgerImpl ledger) {
-        if (!ledgersByDimensionMap.containsKey(metrics)) {
-            // create new list
-            ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
-        } else {
-            // add to collection
-            ledgersByDimensionMap.get(metrics).add(ledger);
-        }
+        ledgersByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(ledger)).add(ledger);
     }
 
     protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
             Metrics metrics, TopicStats destStats) {
-        if (!topicsStatsByDimensionMap.containsKey(metrics)) {
-            // create new list
-            topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
-        } else {
-            // add to collection
-            topicsStatsByDimensionMap.get(metrics).add(destStats);
-        }
-
+        topicsStatsByDimensionMap.computeIfAbsent(metrics, __ -> Lists.newArrayList(destStats)).add(destStats);

Review comment:
       Great catch. Same comment about needing an empty array list to start.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/AbstractMetrics.java
##########
@@ -214,11 +214,7 @@ protected Metrics createMetricsByDimension(String namespace, String fromClusterN
     }
 
     protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
-        if (!map.containsKey(mkey)) {
-            map.put(mkey, Lists.newArrayList(value));
-        } else {
-            map.get(mkey).add(value);
-        }
+        map.computeIfAbsent(mkey, __ -> Lists.newArrayList(value)).add(value);

Review comment:
       I think we'll want to start with an empty `ArrayList`. Otherwise, the first `value` will be added twice.
   ```suggestion
           map.computeIfAbsent(mkey, __ -> Lists.newArrayList()).add(value);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r698560944



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1906,14 +1908,19 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
         Optional<CompactorMXBean> mxBean = getCompactorMXBean();
-        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
-                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
-        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
-                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
+
+        stats.compaction.lastCompactionRemovedEventCount = 0L;
+        stats.compaction.lastCompactionSucceedTimestamp = 0L;
+        stats.compaction.lastCompactionFailedTimestamp = 0L;
+        stats.compaction.lastCompactionDurationTimeInMills = 0L;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-911083313


   Hi @sijia-w, in this PR, @Technoboy- has already added docs to`master`. 
   
   - Could you please add the same contents to `2.8.2`?
   - If you find any inaccurate expressions or grammar issues, you can modify the docs based on technical writing rules.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r698556954



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1999,6 +1999,11 @@
             doc = "If true, export managed cursor metrics"
     )
     private boolean exposeManagedCursorMetricsInPrometheus = false;
+    @FieldContext(
+            category = CATEGORY_METRICS,
+            doc = "If true, export compaction metrics"
+    )
+    private boolean exposeCompactionMetricsInPrometheus = false;

Review comment:
       Ok, updated.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1906,14 +1908,19 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
         Optional<CompactorMXBean> mxBean = getCompactorMXBean();
-        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
-                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
-        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
-                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
+
+        stats.compaction.lastCompactionRemovedEventCount = 0L;
+        stats.compaction.lastCompactionSucceedTimestamp = 0L;
+        stats.compaction.lastCompactionFailedTimestamp = 0L;
+        stats.compaction.lastCompactionDurationTimeInMills = 0L;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r698159643



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -1906,14 +1908,19 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
         stats.lastOffloadSuccessTimeStamp = ledger.getLastOffloadedSuccessTimestamp();
         stats.lastOffloadFailureTimeStamp = ledger.getLastOffloadedFailureTimestamp();
         Optional<CompactorMXBean> mxBean = getCompactorMXBean();
-        stats.compaction.lastCompactionRemovedEventCount = mxBean.map(stat ->
-                stat.getLastCompactionRemovedEventCount(topic)).orElse(0L);
-        stats.compaction.lastCompactionSucceedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionSucceedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionFailedTimestamp = mxBean.map(stat ->
-                stat.getLastCompactionFailedTimestamp(topic)).orElse(0L);
-        stats.compaction.lastCompactionDurationTimeInMills = mxBean.map(stat ->
-                stat.getLastCompactionDurationTimeInMills(topic)).orElse(0L);
+
+        stats.compaction.lastCompactionRemovedEventCount = 0L;
+        stats.compaction.lastCompactionSucceedTimestamp = 0L;
+        stats.compaction.lastCompactionFailedTimestamp = 0L;
+        stats.compaction.lastCompactionDurationTimeInMills = 0L;

Review comment:
       ```java
   stats.compaction.lastCompactionRemovedEventCount = 0L;
   stats.compaction.lastCompactionSucceedTimestamp = 0L;
   stats.compaction.lastCompactionFailedTimestamp = 0L;
   stats.compaction.lastCompactionDurationTimeInMills = 0L;
   ```
   replace with  `stats.compaction.reset(); ` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r697328641



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.metrics;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.CompactorMXBeanImpl;
+
+@Slf4j
+public class CompactionMetrics extends AbstractMetrics {
+
+    private static final Buckets
+            BRK_COMPACTION_LATENCY_BUCKETS = new Buckets("brk_compaction_latencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+
+    private List<Metrics> metricsCollection;
+    private Map<String, Double> tempAggregatedMetricsMap;
+    private Map<Metrics, List<String>> topicsByDimensionMap;
+    private Optional<CompactorMXBean> stats;
+    private int statsPeriodSeconds;
+
+    public CompactionMetrics(PulsarService pulsar) {
+        super(pulsar);
+        this.metricsCollection = Lists.newArrayList();
+        this.topicsByDimensionMap = Maps.newHashMap();
+        this.tempAggregatedMetricsMap = Maps.newHashMap();
+        this.stats = getCompactorMXBean();
+        this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+                .getConfig().getStatsPeriodSeconds();
+    }
+
+    @Override
+    public synchronized List<Metrics> generate() {
+        return aggregate(groupTopicsByDimension());
+    }
+
+
+    /**
+     * Aggregation by namespace.
+     *
+     * @return List<Metrics>
+     */
+    private List<Metrics> aggregate(Map<Metrics, List<String>> topicsByDimension) {
+        metricsCollection.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            for (Map.Entry<Metrics, List<String>> entry : topicsByDimension.entrySet()) {
+                Metrics metrics = entry.getKey();
+                List<String> topics = entry.getValue();
+                tempAggregatedMetricsMap.clear();
+                for (String topic : topics) {
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_removedEventCount",
+                            compactorMXBean.getCompactionRemovedEventCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_succeedCount",
+                            compactorMXBean.getCompactionSucceedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_failedCount",
+                            compactorMXBean.getCompactionFailedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_durationTimeInMills",
+                            compactorMXBean.getCompactionDurationTimeInMills(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_readThroughput",
+                            compactorMXBean.getCompactionReadThroughput(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_writeThroughput",
+                            compactorMXBean.getCompactionWriteThroughput(topic));

Review comment:
       Ok, good suggestion. I have updated the related impl.
   And many thanks for your kind review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r696990968



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1594,6 +1594,8 @@ public void forEachTopic(Consumer<Topic> consumer) {
         });
     }
 
+
+

Review comment:
       Please remove unnecessary whitespace.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.metrics;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.CompactorMXBeanImpl;
+
+@Slf4j
+public class CompactionMetrics extends AbstractMetrics {
+
+    private static final Buckets
+            BRK_COMPACTION_LATENCY_BUCKETS = new Buckets("brk_compaction_latencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+
+    private List<Metrics> metricsCollection;
+    private Map<String, Double> tempAggregatedMetricsMap;
+    private Map<Metrics, List<String>> topicsByDimensionMap;
+    private Optional<CompactorMXBean> stats;
+    private int statsPeriodSeconds;
+
+    public CompactionMetrics(PulsarService pulsar) {
+        super(pulsar);
+        this.metricsCollection = Lists.newArrayList();
+        this.topicsByDimensionMap = Maps.newHashMap();
+        this.tempAggregatedMetricsMap = Maps.newHashMap();
+        this.stats = getCompactorMXBean();
+        this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+                .getConfig().getStatsPeriodSeconds();
+    }
+
+    @Override
+    public synchronized List<Metrics> generate() {
+        return aggregate(groupTopicsByDimension());
+    }
+
+
+    /**
+     * Aggregation by namespace.
+     *
+     * @return List<Metrics>
+     */
+    private List<Metrics> aggregate(Map<Metrics, List<String>> topicsByDimension) {
+        metricsCollection.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            for (Map.Entry<Metrics, List<String>> entry : topicsByDimension.entrySet()) {
+                Metrics metrics = entry.getKey();
+                List<String> topics = entry.getValue();
+                tempAggregatedMetricsMap.clear();
+                for (String topic : topics) {
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_removedEventCount",
+                            compactorMXBean.getCompactionRemovedEventCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_succeedCount",
+                            compactorMXBean.getCompactionSucceedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_failedCount",
+                            compactorMXBean.getCompactionFailedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_durationTimeInMills",
+                            compactorMXBean.getCompactionDurationTimeInMills(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_readThroughput",
+                            compactorMXBean.getCompactionReadThroughput(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_writeThroughput",
+                            compactorMXBean.getCompactionWriteThroughput(topic));

Review comment:
       The `CompactorMXBeanImpl` has a single map from `topic` string to `CompactionRecord`. Instead of performing many get calls on the same map for the same topic, perhaps it would be better to get the `CompactionRecord` for the topic once and then get the relevant metrics from that `CompactionRecord` class. Then, the `CompactorMXBean` would really only need a `getCompactionRecordForTopic` method and a `removeTopic` method. All other metrics would be retrieved from the returned `CompactionRecord` object. If you agree with that change, it would also make sense to refactor the usage in the `PersistentTopic` method `getStats` to prevent the many get calls on the same map for the same topic.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.metrics;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.CompactorMXBeanImpl;
+
+@Slf4j
+public class CompactionMetrics extends AbstractMetrics {
+
+    private static final Buckets
+            BRK_COMPACTION_LATENCY_BUCKETS = new Buckets("brk_compaction_latencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+
+    private List<Metrics> metricsCollection;
+    private Map<String, Double> tempAggregatedMetricsMap;
+    private Map<Metrics, List<String>> topicsByDimensionMap;
+    private Optional<CompactorMXBean> stats;
+    private int statsPeriodSeconds;
+
+    public CompactionMetrics(PulsarService pulsar) {
+        super(pulsar);
+        this.metricsCollection = Lists.newArrayList();
+        this.topicsByDimensionMap = Maps.newHashMap();
+        this.tempAggregatedMetricsMap = Maps.newHashMap();
+        this.stats = getCompactorMXBean();
+        this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+                .getConfig().getStatsPeriodSeconds();
+    }
+
+    @Override
+    public synchronized List<Metrics> generate() {
+        return aggregate(groupTopicsByDimension());
+    }
+
+
+    /**
+     * Aggregation by namespace.
+     *
+     * @return List<Metrics>
+     */
+    private List<Metrics> aggregate(Map<Metrics, List<String>> topicsByDimension) {
+        metricsCollection.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            for (Map.Entry<Metrics, List<String>> entry : topicsByDimension.entrySet()) {
+                Metrics metrics = entry.getKey();
+                List<String> topics = entry.getValue();
+                tempAggregatedMetricsMap.clear();
+                for (String topic : topics) {
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_removedEventCount",
+                            compactorMXBean.getCompactionRemovedEventCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_succeedCount",
+                            compactorMXBean.getCompactionSucceedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_failedCount",
+                            compactorMXBean.getCompactionFailedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_durationTimeInMills",
+                            compactorMXBean.getCompactionDurationTimeInMills(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_readThroughput",
+                            compactorMXBean.getCompactionReadThroughput(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_writeThroughput",
+                            compactorMXBean.getCompactionWriteThroughput(topic));
+
+                    BRK_COMPACTION_LATENCY_BUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                            compactorMXBean.getCompactionLatencyBuckets(topic),
+                            statsPeriodSeconds);
+
+                    CompletableFuture<Optional<Topic>> topicHandle = pulsar.getBrokerService().getTopicIfExists(topic);
+                    Optional<Topic> topicOp = BrokerService.extractTopic(topicHandle);
+                    if (topicOp.isPresent()) {
+                        PersistentTopic persistentTopic = (PersistentTopic) topicOp.get();
+                        Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
+                                .getCompactedTopicContext();
+                        if (compactedTopicContext.isPresent()) {
+                            LedgerHandle ledger = compactedTopicContext.get().getLedger();
+                            long entries = ledger.getLastAddConfirmed() + 1;
+                            long size = ledger.getLength();
+
+                            populateAggregationMapWithSum(tempAggregatedMetricsMap,
+                                    "brk_compaction_compactedEntriesCount", entries);
+
+                            populateAggregationMapWithSum(tempAggregatedMetricsMap,
+                                    "brk_compaction_compactedEntriesSize", size);
+                        }
+                    }
+                }
+                for (Map.Entry<String, Double> ma : tempAggregatedMetricsMap.entrySet()) {
+                    metrics.put(ma.getKey(), ma.getValue());
+                }
+                metricsCollection.add(metrics);
+            }
+            compactorMXBean.reset();
+        }
+
+        return metricsCollection;
+    }
+
+    private Map<Metrics, List<String>> groupTopicsByDimension() {
+        topicsByDimensionMap.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            tempAggregatedMetricsMap.clear();
+            for (String topic : compactorMXBean.getTopics()) {
+                String namespace = TopicName.get(topic).getNamespace();
+                Metrics metrics = super.createMetricsByDimension(namespace);
+                populateDimensionMap(topicsByDimensionMap, metrics, topic);
+            }
+        }
+        return topicsByDimensionMap;
+    }
+
+    private Optional<CompactorMXBean> getCompactorMXBean() {
+        Compactor compactor = null;
+        try {
+            compactor = pulsar.getCompactor(false);
+        } catch (PulsarServerException e) {
+            log.error("get compactor error", e);
+        }
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
+    }
+
+    private void populateDimensionMap(Map<Metrics, List<String>> topicsByDimensionMap, Metrics metrics, String topic) {
+        if (!topicsByDimensionMap.containsKey(metrics)) {
+            topicsByDimensionMap.put(metrics, Lists.newArrayList(topic));
+        } else {
+            topicsByDimensionMap.get(metrics).add(topic);
+        }

Review comment:
       I believe the following is equivalent to yours, and it will save a `get` operation in the case where the `metrics` object is already in the map, which will happen for all topics but the first in each namespace.
   ```suggestion
           topicsByDimensionMap.computeIfAbsent(metrics, __ -> new ArrayList<>()).add(topic);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-908852676


   > Please add a test to check the final prometheus metric output result in `PrometheusMetricsTest`
   
   Yes, done. Thanks for reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #11739: [WIP]Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-903367693


   Thanks for your contribution. Please do not forget to update docs later. And you can ping me to review the docs, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-906196986


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-905498946


   > very good.
   > 
   > I left one comment about exposing an "Impl" class in PersistentTopic.
   > 
   > PTAL
   
   Thanks for reviewing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- edited a comment on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- edited a comment on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-907098798


   > @Technoboy- it could also be convenient to allow users to aggregate these metrics at a namespace level, but not a topic level. This would especially be helpful for users with many compacted topics.
   
   Yes, the current CompactorMetrics is based on namespace level. And many thanks for your kind reivew.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r698108046



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1999,6 +1999,11 @@
             doc = "If true, export managed cursor metrics"
     )
     private boolean exposeManagedCursorMetricsInPrometheus = false;
+    @FieldContext(
+            category = CATEGORY_METRICS,
+            doc = "If true, export compaction metrics"
+    )
+    private boolean exposeCompactionMetricsInPrometheus = false;

Review comment:
       In my opinion, we don't need this new flag to enable or disable the `feature` metrics.
   The existing metrics control flag is based on the different levels:
   
   ```
   # Enable topic level metrics
   exposeTopicLevelMetricsInPrometheus=true
   
   # Enable consumer level metrics. default is false
   exposeConsumerLevelMetricsInPrometheus=false
   
   # Enable producer level metrics. default is false
   exposeProducerLevelMetricsInPrometheus=false
   
   # Enable managed ledger metrics (aggregated by namespace). default is false
   exposeManagedLedgerMetricsInPrometheus=true
   
   # Enable cursor level metrics. default is false
   exposeManagedCursorMetricsInPrometheus=false
   ```
   
   If we introduce a feature-based metrics control, will make the configuration more complex such as enable `feature` metrics on the level.
   
   My suggestion is if users enable the topic level metrics and the topic has been compacted or doing the compaction, we should expose the compaction metrics for the topic level. If users disable the topic level metrics, we should expose the aggregated compaction metrics for the namespace level.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r695532911



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/CompactionMetricsTest.java
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.metrics.CompactionMetrics;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.stats.Metrics;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Random;
+
+@Test(groups = "broker")
+public class CompactionMetricsTest extends BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCompactionMetrics() throws Exception {
+        pulsar.getCompactor(true);
+        CompactionMetrics metrics = new CompactionMetrics(pulsar);
+
+        final String compactionRemovedEventCountKey = "brk_compaction_removedEventCount";
+        final String compactionSucceedCountKey = "brk_compaction_succeedCount";
+        final String compactionFailedCountKey = "brk_compaction_failedCount";
+        final String compactionDurationTimeInMillsKey = "brk_compaction_durationTimeInMills";
+        final String compactionReadThroughputKey = "brk_compaction_readThroughput";
+        final String compactionWriteThroughputKey = "brk_compaction_writeThroughput";
+        List<Metrics> list1 = metrics.generate();
+        Assert.assertTrue(list1.isEmpty());
+
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        final int numMessages = 1000;
+        final int maxKeys = 10;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        Random r = new Random(0);
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key" + keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.newMessage()
+                    .key(key)
+                    .value(data)
+                    .send();
+        }
+
+        pulsar.getCompactor(true).compact(topic).get();
+        List<Metrics> list2 = metrics.generate();
+        System.out.println(list2);

Review comment:
       nit: please remove System.out or use logger

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
##########
@@ -2120,6 +2115,15 @@ public TopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBa
         return statFuture;
     }
 
+    public Optional<CompactedTopicImpl.CompactedTopicContext> getCompactedTopicContext() {

Review comment:
       exposing something named *Impl is a code smell.
   
   can we create an interface for `CompactedTopicContext` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall edited a comment on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
michaeljmarshall edited a comment on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-906774281


   Just took a glance, and this is a good addition. ~change. When reviewing https://github.com/apache/pulsar/pull/11564, I forgot to mention that we should provide a way to disable metrics at a topic level~. I plan to review this PR more thoroughly within 24 hours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-906774281


   Just took a glance, and this is a good change. When reviewing https://github.com/apache/pulsar/pull/11564, I forgot to mention that we should provide a way to disable metrics at a topic level. I plan to review this PR more thoroughly within 24 hours.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet edited a comment on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Anonymitaet edited a comment on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-911083313


   Hi @sijia-w, in this PR, @Technoboy- has already added docs to`master`. 
   
   - Could you please add the same contents to `2.8.2`?
   - If you find any inaccurate expressions, incorrect grammar usage, or other issues, you can modify the docs based on technical writing rules.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r697286763



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.metrics;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.CompactorMXBeanImpl;
+
+@Slf4j
+public class CompactionMetrics extends AbstractMetrics {
+
+    private static final Buckets
+            BRK_COMPACTION_LATENCY_BUCKETS = new Buckets("brk_compaction_latencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+
+    private List<Metrics> metricsCollection;
+    private Map<String, Double> tempAggregatedMetricsMap;
+    private Map<Metrics, List<String>> topicsByDimensionMap;
+    private Optional<CompactorMXBean> stats;
+    private int statsPeriodSeconds;
+
+    public CompactionMetrics(PulsarService pulsar) {
+        super(pulsar);
+        this.metricsCollection = Lists.newArrayList();
+        this.topicsByDimensionMap = Maps.newHashMap();
+        this.tempAggregatedMetricsMap = Maps.newHashMap();
+        this.stats = getCompactorMXBean();
+        this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+                .getConfig().getStatsPeriodSeconds();
+    }
+
+    @Override
+    public synchronized List<Metrics> generate() {
+        return aggregate(groupTopicsByDimension());
+    }
+
+
+    /**
+     * Aggregation by namespace.
+     *
+     * @return List<Metrics>
+     */
+    private List<Metrics> aggregate(Map<Metrics, List<String>> topicsByDimension) {
+        metricsCollection.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            for (Map.Entry<Metrics, List<String>> entry : topicsByDimension.entrySet()) {
+                Metrics metrics = entry.getKey();
+                List<String> topics = entry.getValue();
+                tempAggregatedMetricsMap.clear();
+                for (String topic : topics) {
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_removedEventCount",
+                            compactorMXBean.getCompactionRemovedEventCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_succeedCount",
+                            compactorMXBean.getCompactionSucceedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_failedCount",
+                            compactorMXBean.getCompactionFailedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_durationTimeInMills",
+                            compactorMXBean.getCompactionDurationTimeInMills(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_readThroughput",
+                            compactorMXBean.getCompactionReadThroughput(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_writeThroughput",
+                            compactorMXBean.getCompactionWriteThroughput(topic));
+
+                    BRK_COMPACTION_LATENCY_BUCKETS.populateBucketEntries(tempAggregatedMetricsMap,
+                            compactorMXBean.getCompactionLatencyBuckets(topic),
+                            statsPeriodSeconds);
+
+                    CompletableFuture<Optional<Topic>> topicHandle = pulsar.getBrokerService().getTopicIfExists(topic);
+                    Optional<Topic> topicOp = BrokerService.extractTopic(topicHandle);
+                    if (topicOp.isPresent()) {
+                        PersistentTopic persistentTopic = (PersistentTopic) topicOp.get();
+                        Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
+                                .getCompactedTopicContext();
+                        if (compactedTopicContext.isPresent()) {
+                            LedgerHandle ledger = compactedTopicContext.get().getLedger();
+                            long entries = ledger.getLastAddConfirmed() + 1;
+                            long size = ledger.getLength();
+
+                            populateAggregationMapWithSum(tempAggregatedMetricsMap,
+                                    "brk_compaction_compactedEntriesCount", entries);
+
+                            populateAggregationMapWithSum(tempAggregatedMetricsMap,
+                                    "brk_compaction_compactedEntriesSize", size);
+                        }
+                    }
+                }
+                for (Map.Entry<String, Double> ma : tempAggregatedMetricsMap.entrySet()) {
+                    metrics.put(ma.getKey(), ma.getValue());
+                }
+                metricsCollection.add(metrics);
+            }
+            compactorMXBean.reset();
+        }
+
+        return metricsCollection;
+    }
+
+    private Map<Metrics, List<String>> groupTopicsByDimension() {
+        topicsByDimensionMap.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            tempAggregatedMetricsMap.clear();
+            for (String topic : compactorMXBean.getTopics()) {
+                String namespace = TopicName.get(topic).getNamespace();
+                Metrics metrics = super.createMetricsByDimension(namespace);
+                populateDimensionMap(topicsByDimensionMap, metrics, topic);
+            }
+        }
+        return topicsByDimensionMap;
+    }
+
+    private Optional<CompactorMXBean> getCompactorMXBean() {
+        Compactor compactor = null;
+        try {
+            compactor = pulsar.getCompactor(false);
+        } catch (PulsarServerException e) {
+            log.error("get compactor error", e);
+        }
+        return Optional.ofNullable(compactor).map(c -> c.getStats());
+    }
+
+    private void populateDimensionMap(Map<Metrics, List<String>> topicsByDimensionMap, Metrics metrics, String topic) {
+        if (!topicsByDimensionMap.containsKey(metrics)) {
+            topicsByDimensionMap.put(metrics, Lists.newArrayList(topic));
+        } else {
+            topicsByDimensionMap.get(metrics).add(topic);
+        }

Review comment:
       Yes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r697328641



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/CompactionMetrics.java
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.metrics;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.stats.Metrics;
+import org.apache.pulsar.compaction.CompactedTopicContext;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.CompactorMXBean;
+import org.apache.pulsar.compaction.CompactorMXBeanImpl;
+
+@Slf4j
+public class CompactionMetrics extends AbstractMetrics {
+
+    private static final Buckets
+            BRK_COMPACTION_LATENCY_BUCKETS = new Buckets("brk_compaction_latencyBuckets",
+            ENTRY_LATENCY_BUCKETS_MS);
+
+    private List<Metrics> metricsCollection;
+    private Map<String, Double> tempAggregatedMetricsMap;
+    private Map<Metrics, List<String>> topicsByDimensionMap;
+    private Optional<CompactorMXBean> stats;
+    private int statsPeriodSeconds;
+
+    public CompactionMetrics(PulsarService pulsar) {
+        super(pulsar);
+        this.metricsCollection = Lists.newArrayList();
+        this.topicsByDimensionMap = Maps.newHashMap();
+        this.tempAggregatedMetricsMap = Maps.newHashMap();
+        this.stats = getCompactorMXBean();
+        this.statsPeriodSeconds = ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory())
+                .getConfig().getStatsPeriodSeconds();
+    }
+
+    @Override
+    public synchronized List<Metrics> generate() {
+        return aggregate(groupTopicsByDimension());
+    }
+
+
+    /**
+     * Aggregation by namespace.
+     *
+     * @return List<Metrics>
+     */
+    private List<Metrics> aggregate(Map<Metrics, List<String>> topicsByDimension) {
+        metricsCollection.clear();
+        if (stats.isPresent()) {
+            CompactorMXBeanImpl compactorMXBean = (CompactorMXBeanImpl) stats.get();
+            for (Map.Entry<Metrics, List<String>> entry : topicsByDimension.entrySet()) {
+                Metrics metrics = entry.getKey();
+                List<String> topics = entry.getValue();
+                tempAggregatedMetricsMap.clear();
+                for (String topic : topics) {
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_removedEventCount",
+                            compactorMXBean.getCompactionRemovedEventCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_succeedCount",
+                            compactorMXBean.getCompactionSucceedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_failedCount",
+                            compactorMXBean.getCompactionFailedCount(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_durationTimeInMills",
+                            compactorMXBean.getCompactionDurationTimeInMills(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_readThroughput",
+                            compactorMXBean.getCompactionReadThroughput(topic));
+
+                    populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_compaction_writeThroughput",
+                            compactorMXBean.getCompactionWriteThroughput(topic));

Review comment:
       Ok, good suggestion. I have updated the related impl.
   And many thanks for your kindly review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-905577181


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-908821889


   > LGTM. Thanks for addressing all of my feedback.
   
   Many thanks for your kind review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r695748030



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/CompactionMetricsTest.java
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.metrics.CompactionMetrics;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.common.stats.Metrics;
+import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.List;
+import java.util.Random;
+
+@Test(groups = "broker")
+public class CompactionMetricsTest extends BrokerTestBase {
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCompactionMetrics() throws Exception {
+        pulsar.getCompactor(true);
+        CompactionMetrics metrics = new CompactionMetrics(pulsar);
+
+        final String compactionRemovedEventCountKey = "brk_compaction_removedEventCount";
+        final String compactionSucceedCountKey = "brk_compaction_succeedCount";
+        final String compactionFailedCountKey = "brk_compaction_failedCount";
+        final String compactionDurationTimeInMillsKey = "brk_compaction_durationTimeInMills";
+        final String compactionReadThroughputKey = "brk_compaction_readThroughput";
+        final String compactionWriteThroughputKey = "brk_compaction_writeThroughput";
+        List<Metrics> list1 = metrics.generate();
+        Assert.assertTrue(list1.isEmpty());
+
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+        final int numMessages = 1000;
+        final int maxKeys = 10;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        Random r = new Random(0);
+
+        for (int j = 0; j < numMessages; j++) {
+            int keyIndex = r.nextInt(maxKeys);
+            String key = "key" + keyIndex;
+            byte[] data = ("my-message-" + key + "-" + j).getBytes();
+            producer.newMessage()
+                    .key(key)
+                    .value(data)
+                    .send();
+        }
+
+        pulsar.getCompactor(true).compact(topic).get();
+        List<Metrics> list2 = metrics.generate();
+        System.out.println(list2);

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- removed a comment on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- removed a comment on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-906013201






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] michaeljmarshall commented on pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#issuecomment-906919071


   @Technoboy- it could also be convenient to allow users to aggregate these metrics at a namespace level, but not a topic level. This would especially be helpful for users with many compacted topics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Technoboy- commented on a change in pull request #11739: Expose compaction metrics to Prometheus

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on a change in pull request #11739:
URL: https://github.com/apache/pulsar/pull/11739#discussion_r697287393



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
##########
@@ -1594,6 +1594,8 @@ public void forEachTopic(Consumer<Topic> consumer) {
         });
     }
 
+
+

Review comment:
       Ok, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org