You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/21 14:23:48 UTC

[GitHub] [pulsar] asafm commented on a diff in pull request #17531: [improve][monitor]Add prometheusRawMetricsProvider support

asafm commented on code in PR #17531:
URL: https://github.com/apache/pulsar/pull/17531#discussion_r976577025


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();

Review Comment:
   use `EntityUtils.toString`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();
+        PrometheusTextFormat.writeMetricsCollectedByPrometheusClient(writer, registry);

Review Comment:
   I don't get this line. Why would we want to emit all static default Prometheus collector into the writer, where we already have that in `PulsarMetricsGenerator`?
   



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();

Review Comment:
   removed `catchingAndLoggingThrowables`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusTextFormatUtil.java:
##########
@@ -18,121 +18,174 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import java.io.IOException;
-import java.io.Writer;
-import org.apache.bookkeeper.stats.Counter;
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CollectorRegistry;
+import java.util.Enumeration;
+import java.util.Map;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
  * Logic to write metrics in Prometheus text format.
  */
 public class PrometheusTextFormatUtil {
-    static void writeGauge(Writer w, String name, String cluster, SimpleGauge<? extends Number> gauge) {
+    public static void writeGauge(SimpleTextOutputStream w, String name, SimpleGauge<? extends Number> gauge) {
         // Example:
-        // # TYPE bookie_client_bookkeeper_ml_scheduler_completed_tasks_0 gauge
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_completed_tasks_0{cluster="pulsar"} 1044057
-        try {
-            w.append("# TYPE ").append(name).append(" gauge\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(gauge.getSample().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // # TYPE bookie_storage_entries_count gauge
+        // bookie_storage_entries_count 519
+        w.write("# TYPE ").write(name).write(" gauge\n");
+        w.write(name);
+        writeLabels(w, gauge.getLabels());
+        w.write(' ').write(gauge.getSample().toString()).write('\n');
+
     }
 
-    static void writeCounter(Writer w, String name, String cluster, Counter counter) {
+    public static void writeCounter(SimpleTextOutputStream w, String name, LongAdderCounter counter) {
         // Example:
         // # TYPE jvm_threads_started_total counter
-        // jvm_threads_started_total{cluster="test"} 59
-        try {
-            w.append("# TYPE ").append(name).append(" counter\n");
-            w.append(name).append("{cluster=\"").append(cluster).append("\"}")
-                    .append(' ').append(counter.get().toString()).append('\n');
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // jvm_threads_started_total 59
+        w.write("# TYPE ").write(name).write(" counter\n");
+        w.write(name);
+        writeLabels(w, counter.getLabels());
+        w.write(' ').write(counter.get().toString()).write('\n');
     }
 
-    static void writeOpStat(Writer w, String name, String cluster, DataSketchesOpStatsLogger opStat) {
+    public static void writeOpStat(SimpleTextOutputStream w, String name, DataSketchesOpStatsLogger opStat) {
         // Example:
-        // # TYPE pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued summary
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.5"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.75"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.95"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.99"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.999"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="0.9999"} NaN
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="false",
-        // quantile="1.0"} -Infinity
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="false"} 0
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="false"} 0.0
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.5"} 0.031
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.75"} 0.043
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.95"} 0.061
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.99"} 0.064
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.999"} 0.073
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="0.9999"} 0.073
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued{cluster="pulsar", success="true",
-        // quantile="1.0"} 0.552
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_count{cluster="pulsar", success="true"} 40911432
-        // pulsar_bookie_client_bookkeeper_ml_scheduler_task_queued_sum{cluster="pulsar", success="true"} 527.0
-        try {
-            w.append("# TYPE ").append(name).append(" summary\n");
-            writeQuantile(w, opStat, name, cluster, false, 0.5);
-            writeQuantile(w, opStat, name, cluster, false, 0.75);
-            writeQuantile(w, opStat, name, cluster, false, 0.95);
-            writeQuantile(w, opStat, name, cluster, false, 0.99);
-            writeQuantile(w, opStat, name, cluster, false, 0.999);
-            writeQuantile(w, opStat, name, cluster, false, 0.9999);
-            writeQuantile(w, opStat, name, cluster, false, 1.0);
-            writeCount(w, opStat, name, cluster, false);
-            writeSum(w, opStat, name, cluster, false);
-
-            writeQuantile(w, opStat, name, cluster, true, 0.5);
-            writeQuantile(w, opStat, name, cluster, true, 0.75);
-            writeQuantile(w, opStat, name, cluster, true, 0.95);
-            writeQuantile(w, opStat, name, cluster, true, 0.99);
-            writeQuantile(w, opStat, name, cluster, true, 0.999);
-            writeQuantile(w, opStat, name, cluster, true, 0.9999);
-            writeQuantile(w, opStat, name, cluster, true, 1.0);
-            writeCount(w, opStat, name, cluster, true);
-            writeSum(w, opStat, name, cluster, true);
-
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        // # TYPE bookie_journal_JOURNAL_ADD_ENTRY summary
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.5",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.75",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.95",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.99",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="0.9999",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="false",quantile="1.0",} NaN
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="false",} 0.0
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.5",} 1.706
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.75",} 1.89
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.95",} 2.121
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.99",} 10.708
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="0.9999",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY{success="true",quantile="1.0",} 10.902
+        // bookie_journal_JOURNAL_ADD_ENTRY_count{success="true",} 658.0
+        // bookie_journal_JOURNAL_ADD_ENTRY_sum{success="true",} 1265.0800000000002
+        w.write("# TYPE ").write(name).write(" summary\n");
+        writeQuantile(w, opStat, name, false, 0.5);
+        writeQuantile(w, opStat, name, false, 0.75);
+        writeQuantile(w, opStat, name, false, 0.95);
+        writeQuantile(w, opStat, name, false, 0.99);
+        writeQuantile(w, opStat, name, false, 0.999);
+        writeQuantile(w, opStat, name, false, 0.9999);
+        writeQuantile(w, opStat, name, false, 1.0);
+        writeCount(w, opStat, name, false);
+        writeSum(w, opStat, name, false);
+
+        writeQuantile(w, opStat, name, true, 0.5);
+        writeQuantile(w, opStat, name, true, 0.75);
+        writeQuantile(w, opStat, name, true, 0.95);
+        writeQuantile(w, opStat, name, true, 0.99);
+        writeQuantile(w, opStat, name, true, 0.999);
+        writeQuantile(w, opStat, name, true, 0.9999);
+        writeQuantile(w, opStat, name, true, 1.0);
+        writeCount(w, opStat, name, true);
+        writeSum(w, opStat, name, true);
+    }
+
+    private static void writeQuantile(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                      Boolean success, double quantile) {
+        w.write(name)
+                .write("{success=\"").write(success.toString())
+                .write("\",quantile=\"").write(Double.toString(quantile));
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Double.toString(opStat.getQuantileValue(success, quantile))).write('\n');
+    }
+
+    private static void writeCount(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                   Boolean success) {
+        w.write(name).write("_count{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
+        }
+        w.write("} ")
+                .write(Long.toString(opStat.getCount(success))).write('\n');
+    }
+
+    private static void writeSum(SimpleTextOutputStream w, DataSketchesOpStatsLogger opStat, String name,
+                                 Boolean success) {
+        w.write(name).write("_sum{success=\"").write(success.toString());
+        if (!opStat.getLabels().isEmpty()) {
+            w.write("\", ");
+            writeLabelsNoBraces(w, opStat.getLabels());
+        } else {
+            w.write("\"");
         }
+        w.write("} ")
+                .write(Double.toString(opStat.getSum(success))).write('\n');
     }
 
-    private static void writeQuantile(Writer w, DataSketchesOpStatsLogger opStat, String name, String cluster,
-                                      Boolean success, double quantile) throws IOException {
-        w.append(name).append("{cluster=\"").append(cluster).append("\", success=\"")
-                .append(success.toString()).append("\", quantile=\"")
-                .append(Double.toString(quantile)).append("\"} ")
-                .append(Double.toString(opStat.getQuantileValue(success, quantile))).append('\n');
+    public static void writeMetricsCollectedByPrometheusClient(SimpleTextOutputStream w, CollectorRegistry registry) {

Review Comment:
   Do we need this? Is this called from anywhere once we remove the call talked in the earlier comment?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);

Review Comment:
   If you already have integration built it, and you go about querying the real metrics endpoint, why not check that you have a real BK client metric *with* the labels you expect to have but now you actually have them?
   



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   Maybe `parseMetrics()`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;
-
+    private final CollectorRegistry registry;
+    private Map<String, String> labels;
     /**
      * These acts a registry of the metrics defined in this provider.
      */
-    public final ConcurrentMap<String, LongAdderCounter> counters = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, SimpleGauge<? extends Number>> gauges = new ConcurrentSkipListMap<>();
-    public final ConcurrentMap<String, DataSketchesOpStatsLogger> opStats = new ConcurrentSkipListMap<>();
+    public final ConcurrentMap<ScopeContext, LongAdderCounter> counters = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, SimpleGauge<? extends Number>> gauges = new ConcurrentHashMap<>();
+    public final ConcurrentMap<ScopeContext, DataSketchesOpStatsLogger> opStats = new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedDataSketchesStatsLogger> threadScopedOpStats =
+        new ConcurrentHashMap<>();
+    final ConcurrentMap<ScopeContext, ThreadScopedLongAdderCounter> threadScopedCounters =
+        new ConcurrentHashMap<>();
+
 
     public PrometheusMetricsProvider() {
-        this.cachingStatsProvider = new CachingStatsProvider(new StatsProvider() {
-            @Override
-            public void start(Configuration conf) {
-                // nop
-            }
-
-            @Override
-            public void stop() {
-                // nop
-            }
-
-            @Override
-            public StatsLogger getStatsLogger(String scope) {
-                return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope);
-            }
-
-            @Override
-            public String getStatsName(String... statsComponents) {
-                String completeName;
-                if (statsComponents.length == 0) {
-                    return "";
-                } else if (statsComponents[0].isEmpty()) {
-                    completeName = StringUtils.join(statsComponents, '_', 1, statsComponents.length);
-                } else {
-                    completeName = StringUtils.join(statsComponents, '_');
-                }
-                return Collector.sanitizeMetricName(completeName);
-            }
-        });
+        this(CollectorRegistry.defaultRegistry);
+    }
+
+    public PrometheusMetricsProvider(CollectorRegistry registry) {
+        this.registry = registry;
     }
 
-    @Override
     public void start(Configuration conf) {
-        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
 
+        executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metrics"));
         int latencyRolloverSeconds = conf.getInt(PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS,
                 DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS);
-        cluster = conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME);
+        labels = Collections.singletonMap(CLUSTER_NAME, conf.getString(CLUSTER_NAME, DEFAULT_CLUSTER_NAME));
+
+        executor.scheduleAtFixedRate(() -> {
+            rotateLatencyCollection();
+        }, 1, latencyRolloverSeconds, TimeUnit.SECONDS);
 
-        executor.scheduleAtFixedRate(catchingAndLoggingThrowables(this::rotateLatencyCollection),
-                1, latencyRolloverSeconds, TimeUnit.SECONDS);
     }
 
-    @Override
     public void stop() {
-        executor.shutdownNow();
+        executor.shutdown();
     }
 
-    @Override
     public StatsLogger getStatsLogger(String scope) {
-        return this.cachingStatsProvider.getStatsLogger(scope);
+        return new PrometheusStatsLogger(PrometheusMetricsProvider.this, scope, labels);
     }
 
     @Override
     public void writeAllMetrics(Writer writer) throws IOException {
-        gauges.forEach((name, gauge) -> PrometheusTextFormatUtil.writeGauge(writer, name, cluster, gauge));
-        counters.forEach((name, counter) -> PrometheusTextFormatUtil.writeCounter(writer, name, cluster, counter));
-        opStats.forEach((name, opStatLogger) -> PrometheusTextFormatUtil.writeOpStat(writer, name, cluster,
-                opStatLogger));
+        PrometheusTextFormat prometheusTextFormat = new PrometheusTextFormat();

Review Comment:
   `writeAllMetrics` is currently called from `PrometheusMetricsGenerator`:
   
   ```java
               generateManagedLedgerBookieClientMetrics(pulsar, stream);
   ```
   
   After that call we have support for `PrometheusRawMetricsProvider`
   ```java
               if (metricsProviders != null) {
                   for (PrometheusRawMetricsProvider metricsProvider : metricsProviders) {
                       metricsProvider.generate(stream);
                   }
               }
   ```
   
   I haven't seen any registration of `PrometheusMetricsProvider` as `PrometheusRawMetricsProvider` other than the test. If you do register it, you will have duplicates.
   
   Since writing directly to `SimpleTextOutputFormat` is more correct, I advise the following:
   1. `writeAllMetrics()` should have to throw an exception and have no implementation since it shouldn't be called at all.
   2. Fix the implementation of `generateManagedLedgerBookieClientMetrics` to call a method that uses `SimpleTextOutputFormat` and lose the declaration of implementation of `PrometheusRawMetricsProvider`, it can just be a public method.
   
   The only caveat I see is this: Do you see any chance that we will have two different pairs of (`scopeContext`, gauge/metric/...) such that they have the same metric name, maybe the only difference is in the labels?
   
   If you ditch that Writer based method, you can get rid of `PrometheusTextFormat.java`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/PrometheusMetricsProvider.java:
##########
@@ -18,109 +18,109 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.bookkeeper.stats.CachingStatsProvider;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
+import org.apache.pulsar.common.util.SimpleTextOutputStream;
 
 /**
- * A <i>Prometheus</i> based {@link StatsProvider} implementation.
+ * A <i>Prometheus</i> based {@link PrometheusRawMetricsProvider} implementation.
  */
-public class PrometheusMetricsProvider implements StatsProvider {
+public class PrometheusMetricsProvider implements StatsProvider, PrometheusRawMetricsProvider {
     private ScheduledExecutorService executor;
 
     public static final String PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = "prometheusStatsLatencyRolloverSeconds";
     public static final int DEFAULT_PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS = 60;
     public static final String CLUSTER_NAME = "cluster";
     public static final String DEFAULT_CLUSTER_NAME = "pulsar";
 
-    private String cluster;
-    private final CachingStatsProvider cachingStatsProvider;

Review Comment:
   @hangc0276 Previously there was a caching layer called cachingStatsProvider which saved in a map every StatsLogger created via getStatsLogger. If it was created previously it would return it.
   Now it is removed



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java:
##########
@@ -1634,6 +1643,29 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
         return parsed;
     }
 
+    @Test
+    public void testRawMetricsProvider() throws IOException {
+        PrometheusMetricsProvider rawMetricsProvider = new PrometheusMetricsProvider();
+        rawMetricsProvider.start(new PropertiesConfiguration());
+        rawMetricsProvider.getStatsLogger("test").getOpStatsLogger("test_metrics")
+            .registerSuccessfulEvent(100, TimeUnit.NANOSECONDS);
+
+        getPulsar().addPrometheusRawMetricsProvider(rawMetricsProvider);
+        HttpClient httpClient = HttpClientBuilder.create().build();
+        final String metricsEndPoint = getPulsar().getWebServiceAddress() + "/metrics";
+        HttpResponse response = httpClient.execute(new HttpGet(metricsEndPoint));
+        InputStream inputStream = response.getEntity().getContent();
+        InputStreamReader isReader = new InputStreamReader(inputStream);
+        BufferedReader reader = new BufferedReader(isReader);
+        StringBuffer sb = new StringBuffer();
+        String str;
+        while((str = reader.readLine()) != null){
+            sb.append(str);
+        }
+        Assert.assertTrue(sb.toString().contains("test_metrics"));

Review Comment:
   This doesn't test the full functionality and I think it may be duplicate when calling and parsing.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org