You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/08 01:11:55 UTC

[GitHub] [pulsar] hangc0276 opened a new pull request, #17531: [improve][monitor]Add prometheusRawMetricsProvider support

hangc0276 opened a new pull request, #17531:
URL: https://github.com/apache/pulsar/pull/17531

   ### Motivation
   Currently, Pulsar Prometheus metric framework can generate new metrics from `PrometheusRawMetricsProvider`, and the current PrometheusMetricsProvider implementation doesn't support this interface, which leads to new created PrometheusMetricProvider instance can not integrate with the current metric system.
   
   ### Modification
   Pulsar prometheusMetricsProvider supports the following features.
   - Support label
   - Support PrometheusRawMetricsProvider interface.
   
   This Pr just migrated from the BookKeeper Prometheus metrics provider.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977944721


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   Reiterate question: The only caveat I see is this: Do you see any chance that we will have two different pairs of (scopeContext, gauge/metric/...) such that they have the same metric name, maybe the only difference is in the labels?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1253788055

   > @asafm @tjiuming Would you please help take a look? thanks.
   
   Done @hangc0276 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977747532


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();
+        PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, registry);

Review Comment:
   I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1257683247

   > BTW: Force push is not good - I lose all context - I can't see only the changes you've added
   
   Sorry, I just rebased the master, and it needs force push, otherwise, the push will fail 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977746795


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();

Review Comment:
   I add it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977750934


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   I will use another PR to add checks for the metrics string format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tjiuming commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
tjiuming commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1296328975

   I just wandering that why don't we use Prometheus lib, I think this implementation is hard to maintain


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1255144460

   > > @asafm @tjiuming Would you please help take a look? thanks.
   > 
   > Done @hangc0276
   
   @asafm Thanks for your review, I addressed all the comments, please help take a look again, thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977949136


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   Oh, I read your code below. 
   I'm still amazed after spending weeks reading all the metrics code, I'm still discovering new bits.
   I was wondering what was that weird `PrometheusRawMetricsProvider` nobody was actually using inside Pulsar.
   So you're saying it's in fact a public API for other plugin developers to rely on that? Can you please give me a reference for such one?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977747327


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;

Review Comment:
   I add the cachingStatsProvider back with a small change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977750208


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1253794888

   For future readers of this PR: 
   
   **Why is this PR needed?**
   Pulsar uses Bookkeeper Client, which has metrics. The way to get those metrics is to implement the BK Metric library API, which Pulsar has already done through `PrometheusMetricsProvider`, `PrometheusStatsLogger`, etc.
   The problem with that implementation was that the API itself supported having labels per metric, but the implementation ignored the labels provided to it. 
   
   This PR aims to fix and add support for the labels given to it via the API methods.
   The impact is that now Pulsar `/metrics` endpoint, which exposes the Pulsar metrics in Prometheus format, will contain the Bookkeeper client metrics *with* labels (which were previously stripped).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r976577025


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();

Review Comment:
   use `EntityUtils.toString`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();
+        PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, registry);

Review Comment:
   I don't get this line. Why would we want to emit all static default Prometheus collector into the writer, where we already have that in `PulsarMetricsGenerator`?
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();

Review Comment:
   removed `catchingAndLoggingThrowables`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java:
##########
@@ -18,121 +18,174 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import java.io.IOException;
-import java.io.Writer;
-import org.apache.bookkeeper.stats.Counter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
  * Logic to write metrics in Prometheus text format.
  */
 public class PrometheusTextFormatUtil {
-    static void writeGauge(Writer w, String name, String cluster, SimpleGauge<? extends Number> gauge) {
+    public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge<? extends Number> gauge) {
         // Example:
-        // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057
-        try {
-            w.append("# TYPE ").append(name).append(" gauge\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(gauge.getSample().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // # TYPE bookie_storage_entries_count gauge
+        // bookie_storage_entries_count 519
+        w.write("# TYPE ").write(name).write(" gauge\n");
+        w.write(name);
+        writeLabels(w, gauge.getLabels());
+        w.write(' ').write(gauge.getSample().toString()).write('\n');
+
     }
 
-    static void writeCounter(Writer w, String name, String cluster, Counter counter) {
+    public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) {
         // Example:
         // # TYPE jvm_threads_started_total counter
-        // jvm_threads_started_total{cluster="test"} 59
-        try {
-            w.append("# TYPE ").append(name).append(" counter\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(counter.get().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // jvm_threads_started_total 59
+        w.write("# TYPE ").write(name).write(" counter\n");
+        w.write(name);
+        writeLabels(w, counter.getLabels());
+        w.write(' ').write(counter.get().toString()).write('\n');
     }
 
-    static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) {
+    public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) {
         // Example:
-        // # TYPE pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued summary
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.5"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.75"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.95"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.99"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.999"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.9999"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="1.0"} -Infinity
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="false"} 0
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="false"} 0.0
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.5"} 0.031
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.75"} 0.043
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.95"} 0.061
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.99"} 0.064
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.999"} 0.073
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.9999"} 0.073
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="1.0"} 0.552
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="true"} 40911432
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="true"} 527.0
-        try {
-            w.append("# TYPE ").append(name).append(" summary\n");
-            writeQuantile(w, opStat, name, cluster, false, 0.5);
-            writeQuantile(w, opStat, name, cluster, false, 0.75);
-            writeQuantile(w, opStat, name, cluster, false, 0.95);
-            writeQuantile(w, opStat, name, cluster, false, 0.99);
-            writeQuantile(w, opStat, name, cluster, false, 0.999);
-            writeQuantile(w, opStat, name, cluster, false, 0.9999);
-            writeQuantile(w, opStat, name, cluster, false, 1.0);
-            writeCount(w, opStat, name, cluster, false);
-            writeSum(w, opStat, name, cluster, false);
-
-            writeQuantile(w, opStat, name, cluster, true, 0.5);
-            writeQuantile(w, opStat, name, cluster, true, 0.75);
-            writeQuantile(w, opStat, name, cluster, true, 0.95);
-            writeQuantile(w, opStat, name, cluster, true, 0.99);
-            writeQuantile(w, opStat, name, cluster, true, 0.999);
-            writeQuantile(w, opStat, name, cluster, true, 0.9999);
-            writeQuantile(w, opStat, name, cluster, true, 1.0);
-            writeCount(w, opStat, name, cluster, true);
-            writeSum(w, opStat, name, cluster, true);
-
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002
+        w.write("# TYPE ").write(name).write(" summary\n");
+        writeQuantile(w, opStat, name, false, 0.5);
+        writeQuantile(w, opStat, name, false, 0.75);
+        writeQuantile(w, opStat, name, false, 0.95);
+        writeQuantile(w, opStat, name, false, 0.99);
+        writeQuantile(w, opStat, name, false, 0.999);
+        writeQuantile(w, opStat, name, false, 0.9999);
+        writeQuantile(w, opStat, name, false, 1.0);
+        writeCount(w, opStat, name, false);
+        writeSum(w, opStat, name, false);
+
+        writeQuantile(w, opStat, name, true, 0.5);
+        writeQuantile(w, opStat, name, true, 0.75);
+        writeQuantile(w, opStat, name, true, 0.95);
+        writeQuantile(w, opStat, name, true, 0.99);
+        writeQuantile(w, opStat, name, true, 0.999);
+        writeQuantile(w, opStat, name, true, 0.9999);
+        writeQuantile(w, opStat, name, true, 1.0);
+        writeCount(w, opStat, name, true);
+        writeSum(w, opStat, name, true);
+    }
+
+    private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                      Boolean success, double quantile) {
+        w.write(name)
+                .write("{success=\"").write(success.toString())
+                .write("\",quantile=\"").write(Double.toString(quantile));
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n');
+    }
+
+    private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                   Boolean success) {
+        w.write(name).write("_count{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Long.toString(opStat.getCount(success))).write('\n');
+    }
+
+    private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                 Boolean success) {
+        w.write(name).write("_sum{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
         }
+        w.write("} ")
+                .write(Double.toString(opStat.getSum(success))).write('\n');
     }
 
-    private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
-                                      Boolean success, double quantile) throws IOException {
-        w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"")
-                .append(success.toString()).append("\", quantile=\"")
-                .append(Double.toString(quantile)).append("\"} ")
-                .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n');
+    public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) {

Review Comment:
   Do we need this? Is this called from anywhere once we remove the call talked in the earlier comment?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   If you already have integration built it, and you go about querying the real metrics endpoint, why not check that you have a real BK client metric *with* the labels you expect to have but now you actually have them?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   Maybe `parseMetrics()`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   `writeAllMetrics` is currently called from `PrometheusMetricsGenerator`:
   
   ```java
               generateManagedLedgerBookieClientMetrics(pulsar, stream);
   ```
   
   After that call we have support for `PrometheusRawMetricsProvider`
   ```java
               if (metricsProviders != null) {
                   for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
                       metricsProvider.generate(stream);
                   }
               }
   ```
   
   I haven't seen any registration of `PrometheusMetricsProvider` as `PrometheusRawMetricsProvider` other than the test. If you do register it, you will have duplicates.
   
   Since writing directly to `SimpleTextOutputFormat` is more correct, I advise the following:
   1. `writeAllMetrics()` should have to throw an exception and have no implementation since it shouldn't be called at all.
   2. Fix the implementation of `generateManagedLedgerBookieClientMetrics` to call a method that uses `SimpleTextOutputFormat` and lose the declaration of implementation of `PrometheusRawMetricsProvider`, it can just be a public method.
   
   The only caveat I see is this: Do you see any chance that we will have two different pairs of (`scopeContext`, gauge/metric/...) such that they have the same metric name, maybe the only difference is in the labels?
   
   If you ditch that Writer based method, you can get rid of `PrometheusTextFormat.java`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;

Review Comment:
   @hangc0276 Previously there was a caching layer called cachingStatsProvider which saved in a map every StatsLogger created via getStatsLogger. If it was created previously it would return it.
   Now it is removed



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   This doesn't test the full functionality and I think it may be duplicate when calling and parsing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r983490703


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java:
##########
@@ -315,12 +314,6 @@ private static void generateManagedLedgerBookieClientMetrics(PulsarService pulsa
             return;
         }
 
-        try {
-            Writer writer = new StringWriter();
-            statsProvider.writeAllMetrics(writer);
-            stream.write(writer.toString());
-        } catch (IOException e) {
-            // nop
-        }
+        ((PrometheusMetricsProvider) statsProvider).generate(stream);

Review Comment:
   It's weird to have a cast in software you wrote - you usually see this when you use third-party software. Why can't you change the original variable type?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.base.Joiner;
+import io.prometheus.client.Collector;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@code StatsLogger} that caches the stats objects created by other {@code StatsLogger}.
+ */
+public class CachingStatsLogger implements StatsLogger {
+
+    protected final StatsLogger underlying;
+    protected final ConcurrentMap<ScopeContext, Counter> counters;
+    protected final ConcurrentMap<ScopeContext, OpStatsLogger> opStatsLoggers;
+    protected final ConcurrentMap<ScopeContext, StatsLogger> scopeStatsLoggers;
+    protected final ConcurrentMap<ScopeContext, StatsLogger> scopeLabelStatsLoggers;
+
+    private final String scope;
+    private final Map<String, String> labels;
+
+    public CachingStatsLogger(String scope, StatsLogger statsLogger, Map<String, String> labels) {
+        this.scope = scope;
+        this.labels = labels;
+        this.underlying = statsLogger;
+        this.counters = new ConcurrentHashMap<>();
+        this.opStatsLoggers = new ConcurrentHashMap<>();
+        this.scopeStatsLoggers = new ConcurrentHashMap<>();
+        this.scopeLabelStatsLoggers = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public int hashCode() {
+        return underlying.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof CachingStatsLogger)) {
+            return false;
+        }
+        CachingStatsLogger another = (CachingStatsLogger) obj;
+        return underlying.equals(another.underlying);
+    }
+
+    @Override
+    public String toString() {
+        return underlying.toString();
+    }
+
+    @Override
+    public OpStatsLogger getOpStatsLogger(String name) {
+        return opStatsLoggers.computeIfAbsent(scopeContext(name), x -> underlying.getOpStatsLogger(name));

Review Comment:
   Why would you cache it under key = `scopeContext(name)` and not just `name`?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -24,37 +24,44 @@
 import io.prometheus.client.Collector;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {

Review Comment:
   Based on my reply to the comment, I think it's redundant. We should have a single `PrometheusMetricsProvider` in `PulsarService`, and its metrics should be written explicitly in `PrometheusMetricsGenerator`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java:
##########
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Logic to write metrics in Prometheus text format.
+ */
+public class PrometheusTextFormat {
+
+    Set<String> metricNameSet = new HashSet<>();
+
+    public void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge<? extends Number> gauge) {
+        // Example:
+        // # TYPE bookie_storage_entries_count gauge
+        // bookie_storage_entries_count 519
+        writeType(w, name, "gauge");

Review Comment:
   Now that you have grouped the metrics together by metric name, the type needs to be written once per metric name, so called from outside.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsLogger.java:
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.base.Joiner;
+import io.prometheus.client.Collector;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.stats.Counter;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+
+/**
+ * A {@code StatsLogger} that caches the stats objects created by other {@code StatsLogger}.
+ */
+public class CachingStatsLogger implements StatsLogger {
+
+    protected final StatsLogger underlying;
+    protected final ConcurrentMap<ScopeContext, Counter> counters;
+    protected final ConcurrentMap<ScopeContext, OpStatsLogger> opStatsLoggers;
+    protected final ConcurrentMap<ScopeContext, StatsLogger> scopeStatsLoggers;
+    protected final ConcurrentMap<ScopeContext, StatsLogger> scopeLabelStatsLoggers;
+
+    private final String scope;
+    private final Map<String, String> labels;
+
+    public CachingStatsLogger(String scope, StatsLogger statsLogger, Map<String, String> labels) {
+        this.scope = scope;
+        this.labels = labels;
+        this.underlying = statsLogger;
+        this.counters = new ConcurrentHashMap<>();
+        this.opStatsLoggers = new ConcurrentHashMap<>();
+        this.scopeStatsLoggers = new ConcurrentHashMap<>();
+        this.scopeLabelStatsLoggers = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public int hashCode() {
+        return underlying.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof CachingStatsLogger)) {
+            return false;
+        }
+        CachingStatsLogger another = (CachingStatsLogger) obj;
+        return underlying.equals(another.underlying);
+    }
+
+    @Override
+    public String toString() {
+        return underlying.toString();
+    }
+
+    @Override
+    public OpStatsLogger getOpStatsLogger(String name) {
+        return opStatsLoggers.computeIfAbsent(scopeContext(name), x -> underlying.getOpStatsLogger(name));
+    }
+
+    @Override
+    public Counter getCounter(String name) {
+        return counters.computeIfAbsent(scopeContext(name), x -> underlying.getCounter(name));
+    }
+
+    @Override
+    public <T extends Number> void registerGauge(String name, Gauge<T> gauge) {
+        underlying.registerGauge(name, gauge);
+    }
+
+    @Override
+    public <T extends Number> void unregisterGauge(String name, Gauge<T> gauge) {
+        underlying.unregisterGauge(name, gauge);
+    }
+
+    @Override
+    public StatsLogger scope(String name) {
+        return scopeStatsLoggers.computeIfAbsent(scopeContext(name),
+            x -> new CachingStatsLogger(scope, underlying.scope(name), labels));
+    }
+
+    @Override
+    public StatsLogger scopeLabel(String labelName, String labelValue) {
+        Map<String, String> newLabels = new TreeMap<>(labels);
+        newLabels.put(labelName, labelValue);
+        return scopeLabelStatsLoggers.computeIfAbsent(new ScopeContext(completeName(""), newLabels),
+            x -> new CachingStatsLogger(scope, underlying.scopeLabel(labelName, labelValue), newLabels));
+    }
+
+
+    @Override
+    public void removeScope(String name, StatsLogger statsLogger) {
+        scopeStatsLoggers.remove(scopeContext(name), statsLogger);
+    }
+
+    /**
+     Thread-scoped stats not currently supported.

Review Comment:
   Why is it not supported?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r983467283


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   > Yes, there are many cases. Such as a metric name is publishrLatency, and the label with a different topic name. It represents different metrics.
   
   Well @hangc0276 in that case you must use `PrometheusMetricsStreams` - this class guarantees the output will be grouped by metric name as Prometheus Text format dictates



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1255375369

   > > For future readers of this PR:
   > > **Why is this PR needed?** Pulsar uses Bookkeeper Client, which has metrics. The way to get those metrics is to implement the BK Metric library API, which Pulsar has already done through `PrometheusMetricsProvider`, `PrometheusStatsLogger`, etc. The problem with that implementation was that the API itself supported having labels per metric, but the implementation ignored the labels provided to it.
   > > This PR aims to fix and add support for the labels given to it via the API methods. The impact is that now Pulsar `/metrics` endpoint, which exposes the Pulsar metrics in Prometheus format, will contain the Bookkeeper client metrics _with_ labels (which were previously stripped).
   > 
   > @asafm Thanks for your explanation. Another purpose of this Pr is to support plugin metrics integrate with Pulsar broker's metrics through the interface.
   > 
   > https://github.com/apache/pulsar/blob/63d4cf20e7b9c9bd24d3fcd5ba7397f0d185ce57/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java#L904-L914
   > 
   > https://github.com/apache/pulsar/blob/63d4cf20e7b9c9bd24d3fcd5ba7397f0d185ce57/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java#L1673-L1682
   
   How do you provide such support in your PR - that I haven't figured out yet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977945813


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   I don't have enough knowledge on 
   > is used for the plugin metrics integrated with Pulsar broker.
   
   Can you please elaborate and give references?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1255376642

   BTW: Force push is not good - I lose all context - I can't see only the changes you've added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977968050


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1632,6 +1639,35 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        try {
+            rawMetricsProvider.start(new PropertiesConfiguration());
+            rawMetricsProvider.getStatsLogger("test_raw")
+                .scopeLabel("topic", "persistent://public/default/test-v1")
+                .getOpStatsLogger("writeLatency")
+                .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+            getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+            HttpClient httpClient = HttpClientBuilder.create().build();
+            final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+            HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+            Multimap<String, Metric> metrics = parseMetrics(EntityUtils.toString(response.getEntity()));
+            if (((List<Metric>) metrics.get("test_raw_writeLatency_count"))

Review Comment:
   Let's extract to variable `(List<Metric>) metrics.get("test_raw_writeLatency_count"))
                   .get(0)` to make it easier to read the next lines



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -24,37 +24,44 @@
 import io.prometheus.client.Collector;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {

Review Comment:
   I don't see a reason to implement `PrometheusRawMetricsProvider`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * A {@code CachingStatsProvider} adds the caching functionality to an existing {@code StatsProvider}.
+ *
+ * <p>The stats provider will cache the stats objects created by the other {@code StatsProvider} to allow
+ * the reusability of stats objects and avoid creating a lot of stats objects.
+ */
+public class CachingStatsProvider implements StatsProvider {
+
+    protected final StatsProvider underlying;
+    protected final ConcurrentMap<String, StatsLogger> statsLoggers;
+
+    public CachingStatsProvider(StatsProvider provider) {
+        this.underlying = provider;
+        this.statsLoggers = new ConcurrentHashMap<String, StatsLogger>();
+    }
+
+    @Override
+    public void start(Configuration conf) {
+        this.underlying.start(conf);
+    }
+
+    @Override
+    public void stop() {
+        this.underlying.stop();
+    }
+
+    @Override
+    public StatsLogger getStatsLogger(String scope) {

Review Comment:
   Now that we have labels, should `String` -> `ScopeContext`? since sope is just the metric prefix, and in theory you can have same metric prefix, different labels



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1632,6 +1639,35 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        try {
+            rawMetricsProvider.start(new PropertiesConfiguration());
+            rawMetricsProvider.getStatsLogger("test_raw")
+                .scopeLabel("topic", "persistent://public/default/test-v1")
+                .getOpStatsLogger("writeLatency")
+                .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+            getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   Why are you doing that test?
   
   The whole feature is about adding support of labels to BK client metrics. You changed `PrometheusMetricsProvider` to do that.
   You changed the integration point of `PrometheusMetricsProvider` to `PrometheusMetricsGenerator`
   
   I don't see any relation to `PrometheusRawMetricsProvider`.
   
   I reiterate the last comment - you should test how suddenly BK client metrics have labels unless I'm missing something big here.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormat.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
+
+/**
+ * Logic to write metrics in Prometheus text format.
+ */
+public class PrometheusTextFormat {
+    public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge<? extends Number> gauge) {
+        // Example:
+        // # TYPE bookie_storage_entries_count gauge
+        // bookie_storage_entries_count 519
+        w.write("# TYPE ").write(name).write(" gauge\n");
+        w.write(name);
+        writeLabels(w, gauge.getLabels());
+        w.write(' ').write(gauge.getSample().toString()).write('\n');
+
+    }
+
+    public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) {
+        // Example:
+        // # TYPE jvm_threads_started_total counter
+        // jvm_threads_started_total 59
+        w.write("# TYPE ").write(name).write(" counter\n");
+        w.write(name);
+        writeLabels(w, counter.getLabels());
+        w.write(' ').write(counter.get().toString()).write('\n');
+    }
+
+    public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) {
+        // Example:
+        // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002
+        w.write("# TYPE ").write(name).write(" summary\n");
+        writeQuantile(w, opStat, name, false, 0.5);
+        writeQuantile(w, opStat, name, false, 0.75);
+        writeQuantile(w, opStat, name, false, 0.95);
+        writeQuantile(w, opStat, name, false, 0.99);
+        writeQuantile(w, opStat, name, false, 0.999);
+        writeQuantile(w, opStat, name, false, 0.9999);
+        writeQuantile(w, opStat, name, false, 1.0);
+        writeCount(w, opStat, name, false);
+        writeSum(w, opStat, name, false);
+
+        writeQuantile(w, opStat, name, true, 0.5);
+        writeQuantile(w, opStat, name, true, 0.75);
+        writeQuantile(w, opStat, name, true, 0.95);
+        writeQuantile(w, opStat, name, true, 0.99);
+        writeQuantile(w, opStat, name, true, 0.999);
+        writeQuantile(w, opStat, name, true, 0.9999);
+        writeQuantile(w, opStat, name, true, 1.0);
+        writeCount(w, opStat, name, true);
+        writeSum(w, opStat, name, true);
+    }
+
+    private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                      Boolean success, double quantile) {
+        w.write(name)
+                .write("{success=\"").write(success.toString())
+                .write("\",quantile=\"").write(Double.toString(quantile));
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n');
+    }
+
+    private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                   Boolean success) {
+        w.write(name).write("_count{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Long.toString(opStat.getCount(success))).write('\n');
+    }
+
+    private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                 Boolean success) {
+        w.write(name).write("_sum{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Double.toString(opStat.getSum(success))).write('\n');
+    }
+
+    public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) {

Review Comment:
   Do you need this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1296066844

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r979704869


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   Yes, there are many cases. Such as a metric name is `publishrLatency`, and the label with a different topic name. It represents different metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1333056759

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977748287


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   Good point, updated the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977749949


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   The PR is not mainly developed for the BK client metrics, and it is used for the plugin metrics integrated with Pulsar broker.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r979724814


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -24,37 +24,44 @@
 import io.prometheus.client.Collector;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {

Review Comment:
   Refer to https://github.com/apache/pulsar/pull/17531#discussion_r979710232



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r979710232


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   Yes, it is a public API for other plugins which run within the Pulsar broker to expose metrics to the Pulsar broker metric port. A public plugin repo is KOP. https://github.com/streamnative/kop/blob/a9f56e9b0435429dd8d977ba948c6772e6fe5b86/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaProtocolHandler.java#L280. 
   I'm developing another plugin, which also needs this public API to expose metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r977748660


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java:
##########
@@ -18,121 +18,174 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import java.io.IOException;
-import java.io.Writer;
-import org.apache.bookkeeper.stats.Counter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
  * Logic to write metrics in Prometheus text format.
  */
 public class PrometheusTextFormatUtil {
-    static void writeGauge(Writer w, String name, String cluster, SimpleGauge<? extends Number> gauge) {
+    public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge<? extends Number> gauge) {
         // Example:
-        // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057
-        try {
-            w.append("# TYPE ").append(name).append(" gauge\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(gauge.getSample().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // # TYPE bookie_storage_entries_count gauge
+        // bookie_storage_entries_count 519
+        w.write("# TYPE ").write(name).write(" gauge\n");
+        w.write(name);
+        writeLabels(w, gauge.getLabels());
+        w.write(' ').write(gauge.getSample().toString()).write('\n');
+
     }
 
-    static void writeCounter(Writer w, String name, String cluster, Counter counter) {
+    public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) {
         // Example:
         // # TYPE jvm_threads_started_total counter
-        // jvm_threads_started_total{cluster="test"} 59
-        try {
-            w.append("# TYPE ").append(name).append(" counter\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(counter.get().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // jvm_threads_started_total 59
+        w.write("# TYPE ").write(name).write(" counter\n");
+        w.write(name);
+        writeLabels(w, counter.getLabels());
+        w.write(' ').write(counter.get().toString()).write('\n');
     }
 
-    static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) {
+    public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) {
         // Example:
-        // # TYPE pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued summary
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.5"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.75"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.95"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.99"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.999"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.9999"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="1.0"} -Infinity
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="false"} 0
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="false"} 0.0
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.5"} 0.031
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.75"} 0.043
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.95"} 0.061
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.99"} 0.064
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.999"} 0.073
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.9999"} 0.073
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="1.0"} 0.552
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="true"} 40911432
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="true"} 527.0
-        try {
-            w.append("# TYPE ").append(name).append(" summary\n");
-            writeQuantile(w, opStat, name, cluster, false, 0.5);
-            writeQuantile(w, opStat, name, cluster, false, 0.75);
-            writeQuantile(w, opStat, name, cluster, false, 0.95);
-            writeQuantile(w, opStat, name, cluster, false, 0.99);
-            writeQuantile(w, opStat, name, cluster, false, 0.999);
-            writeQuantile(w, opStat, name, cluster, false, 0.9999);
-            writeQuantile(w, opStat, name, cluster, false, 1.0);
-            writeCount(w, opStat, name, cluster, false);
-            writeSum(w, opStat, name, cluster, false);
-
-            writeQuantile(w, opStat, name, cluster, true, 0.5);
-            writeQuantile(w, opStat, name, cluster, true, 0.75);
-            writeQuantile(w, opStat, name, cluster, true, 0.95);
-            writeQuantile(w, opStat, name, cluster, true, 0.99);
-            writeQuantile(w, opStat, name, cluster, true, 0.999);
-            writeQuantile(w, opStat, name, cluster, true, 0.9999);
-            writeQuantile(w, opStat, name, cluster, true, 1.0);
-            writeCount(w, opStat, name, cluster, true);
-            writeSum(w, opStat, name, cluster, true);
-
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002
+        w.write("# TYPE ").write(name).write(" summary\n");
+        writeQuantile(w, opStat, name, false, 0.5);
+        writeQuantile(w, opStat, name, false, 0.75);
+        writeQuantile(w, opStat, name, false, 0.95);
+        writeQuantile(w, opStat, name, false, 0.99);
+        writeQuantile(w, opStat, name, false, 0.999);
+        writeQuantile(w, opStat, name, false, 0.9999);
+        writeQuantile(w, opStat, name, false, 1.0);
+        writeCount(w, opStat, name, false);
+        writeSum(w, opStat, name, false);
+
+        writeQuantile(w, opStat, name, true, 0.5);
+        writeQuantile(w, opStat, name, true, 0.75);
+        writeQuantile(w, opStat, name, true, 0.95);
+        writeQuantile(w, opStat, name, true, 0.99);
+        writeQuantile(w, opStat, name, true, 0.999);
+        writeQuantile(w, opStat, name, true, 0.9999);
+        writeQuantile(w, opStat, name, true, 1.0);
+        writeCount(w, opStat, name, true);
+        writeSum(w, opStat, name, true);
+    }
+
+    private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                      Boolean success, double quantile) {
+        w.write(name)
+                .write("{success=\"").write(success.toString())
+                .write("\",quantile=\"").write(Double.toString(quantile));
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n');
+    }
+
+    private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                   Boolean success) {
+        w.write(name).write("_count{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Long.toString(opStat.getCount(success))).write('\n');
+    }
+
+    private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                 Boolean success) {
+        w.write(name).write("_sum{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
         }
+        w.write("} ")
+                .write(Double.toString(opStat.getSum(success))).write('\n');
     }
 
-    private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
-                                      Boolean success, double quantile) throws IOException {
-        w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"")
-                .append(success.toString()).append("\", quantile=\"")
-                .append(Double.toString(quantile)).append("\"} ")
-                .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n');
+    public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1255143322

   > For future readers of this PR:
   > 
   > **Why is this PR needed?** Pulsar uses Bookkeeper Client, which has metrics. The way to get those metrics is to implement the BK Metric library API, which Pulsar has already done through `PrometheusMetricsProvider`, `PrometheusStatsLogger`, etc. The problem with that implementation was that the API itself supported having labels per metric, but the implementation ignored the labels provided to it.
   > 
   > This PR aims to fix and add support for the labels given to it via the API methods. The impact is that now Pulsar `/metrics` endpoint, which exposes the Pulsar metrics in Prometheus format, will contain the Bookkeeper client metrics _with_ labels (which were previously stripped).
   
   @asafm  Thanks for your explanation. Another purpose of this Pr is to support plugin metrics integrate with Pulsar broker's metrics through the interface.
   https://github.com/apache/pulsar/blob/63d4cf20e7b9c9bd24d3fcd5ba7397f0d185ce57/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java#L904-L914
   
   https://github.com/apache/pulsar/blob/63d4cf20e7b9c9bd24d3fcd5ba7397f0d185ce57/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java#L1673-L1682
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1253174793

   @asafm @tjiuming  Would you please help take a look? thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r980175871


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   I add `CachingStatsLogger` to cache the same scopeLabel instances to avoid creating too many instances with the same scope and label name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r980186626


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/CachingStatsProvider.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * A {@code CachingStatsProvider} adds the caching functionality to an existing {@code StatsProvider}.
+ *
+ * <p>The stats provider will cache the stats objects created by the other {@code StatsProvider} to allow
+ * the reusability of stats objects and avoid creating a lot of stats objects.
+ */
+public class CachingStatsProvider implements StatsProvider {
+
+    protected final StatsProvider underlying;
+    protected final ConcurrentMap<String, StatsLogger> statsLoggers;
+
+    public CachingStatsProvider(StatsProvider provider) {
+        this.underlying = provider;
+        this.statsLoggers = new ConcurrentHashMap<String, StatsLogger>();
+    }
+
+    @Override
+    public void start(Configuration conf) {
+        this.underlying.start(conf);
+    }
+
+    @Override
+    public void stop() {
+        this.underlying.stop();
+    }
+
+    @Override
+    public StatsLogger getStatsLogger(String scope) {

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#issuecomment-1258203751

   This PR changed the metric name for BK client, and it can only be released in the major version and needs to update the doc. /cc @Anonymitaet 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r980169270


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1632,6 +1639,35 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        try {
+            rawMetricsProvider.start(new PropertiesConfiguration());
+            rawMetricsProvider.getStatsLogger("test_raw")
+                .scopeLabel("topic", "persistent://public/default/test-v1")
+                .getOpStatsLogger("writeLatency")
+                .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+            getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   The BK client metrics test is `testManagedLedgerBookieClientStats` and I updated the test to adjust for this change.
   https://github.com/apache/pulsar/blob/90c554d3fa0efe0a24ca9af9946efa8908ccf2dc/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java#L982-L1038
   
   `testRawMetricsProvider` is mainly to test the `PrometheusRawMetricsProvider`. The relation is used by `getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);` to register RawMetricsProvider to Pulsar service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

Posted by GitBox <gi...@apache.org>.
asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r983479898


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   @hangc0276 Ok, from my understanding, and correct me if I'm wrong here: This PR fixes the BK Metrics API implementation in such a way that it will support labels. *But* it doesn't fix another issue here: The ability to *use* `PrometheusMetricsProvider` by plugins. 
   
   So today, and in your PR, we only print the metrics registered to `PrometheusMetricsProvider` instantiated in `ManagedLedgerFactory` - which is good as we now get the BK client metrics with labels.
   
   *But*, I believe the correct solution should be to instantiate `PrometheusMetricsProvider` in `PulsarService`, and pass it along to `ManagedLedgerFactory` and also available to retrieve by any plugin if needed.
   
   The way I see it, plugins should use:
   1. Prometheus Client library - preferred.
   2. BK Metrics API implementation - not the best-preferred way. 
   3. Raw metrics provider, if all other means fail.
   If we're already doing this, at least let's do this right. In your case, you know you want to use the `PrometheusMetricsProvider` maybe we should just expose it and use it properly.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org