You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/30 18:35:21 UTC

[GitHub] sijie closed pull request #2679: Add types comment in Prometheus stats

sijie closed pull request #2679: Add types comment in Prometheus stats
URL: https://github.com/apache/pulsar/pull/2679
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index d07bc386ad..36343cfe30 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -24,9 +24,12 @@
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 
 import io.netty.util.concurrent.FastThreadLocal;
 
+import java.util.Set;
+
 public class NamespaceStatsAggregator {
 
     private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats = new FastThreadLocal<AggregatedNamespaceStats>() {
@@ -43,6 +46,8 @@ protected TopicStats initialValue() throws Exception {
         }
     };
 
+    private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
+
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
@@ -51,6 +56,8 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b
         pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
             namespaceStats.reset();
 
+            METRIC_TYPES.forEach(metric -> TopicStats.metricType(stream, metric));
+
             bundlesMap.forEach((bundle, topicsMap) -> {
                 topicsMap.forEach((name, topic) -> {
                     getTopicStats(topic, topicStats, includeConsumerMetrics);
@@ -218,20 +225,35 @@ private static void printNamespaceStats(SimpleTextOutputStream stream, String cl
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                long value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            TopicStats.metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                double value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            TopicStats.metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
                                                 String name, String remoteCluster, double value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace);
-        stream.write("\", remote_cluster=\"").write(remoteCluster).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            TopicStats.metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
+        stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 5c42b51328..e346d27c29 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -86,12 +86,16 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String
         while (metricFamilySamples.hasMoreElements()) {
             MetricFamilySamples metricFamily = metricFamilySamples.nextElement();
 
+            // Write type of metric
+            stream.write("# TYPE ").write(metricFamily.name).write(' ')
+                    .write(getTypeStr(metricFamily.type)).write('\n');
+
             for (int i = 0; i < metricFamily.samples.size(); i++) {
                 Sample sample = metricFamily.samples.get(i);
                 stream.write(sample.name);
                 stream.write("{cluster=\"").write(cluster).write('"');
                 for (int j = 0; j < sample.labelNames.size(); j++) {
-                    stream.write(", ");
+                    stream.write(",");
                     stream.write(sample.labelNames.get(j));
                     stream.write("=\"");
                     stream.write(sample.labelValues.get(j));
@@ -104,4 +108,21 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String
             }
         }
     }
+
+    static String getTypeStr(Collector.Type type) {
+        switch (type) {
+        case COUNTER:
+            return "counter";
+        case GAUGE:
+            return "gauge";
+        case SUMMARY        :
+            return "summary";
+        case HISTOGRAM:
+            return "histogram";
+        case UNTYPED:
+        default:
+            return "untyped";
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index c592b8f49b..ff946b95fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -18,12 +18,14 @@
  */
 package org.apache.pulsar.broker.stats.prometheus;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
-
-import java.util.HashMap;
-import java.util.Map;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 
 class TopicStats {
 
@@ -45,6 +47,7 @@
 
     Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
     Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
+    private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
 
     public void reset() {
         subscriptionsCount = 0;
@@ -69,6 +72,8 @@ public void reset() {
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
                                 TopicStats stats) {
 
+        METRIC_TYPES.forEach(metric -> metricType(stream, metric));
+
         metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
         metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount);
         metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount);
@@ -129,40 +134,69 @@ static void printTopicStats(SimpleTextOutputStream stream, String cluster, Strin
 
     }
 
+    static void metricType(SimpleTextOutputStream stream, String name) {
+        stream.write("# TYPE ").write(name).write(" gauge\n");
+    }
+
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
                                String name, double value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\", topic=\"").write(topic).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String name, long value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String name, double value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String consumerName, long consumerId, String name, long value) {
+        if (!METRIC_TYPES.contains(name)) {
+            metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription)
-                .write("\", consumer_name=\"").write(consumerName).write("\", consumer_id=\"").write(consumerId).write("\"} ");
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
+                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String consumerName, long consumerId, String name, double value) {
-        stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\", topic=\"").write(topic).write("\", subscription=\"").write(subscription)
-                .write("\", consumer_name=\"").write(consumerName).write("\", consumer_id=\"").write(consumerId).write("\"} ");
+        if (!METRIC_TYPES.contains(name)) {
+            metricType(stream, name);
+            METRIC_TYPES.add(name);
+        }
+
+        stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+                .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
+                .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 6ad8d6d5ce..91bc0b826b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -150,7 +150,7 @@ public void testPerNamespaceStats() throws Exception {
         Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
 
         Splitter.on("\n").split(metrics).forEach(line -> {
-            if (line.isEmpty()) {
+            if (line.isEmpty() || line.startsWith("#")) {
                 return;
             }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index 745f5d7297..907afaf198 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -21,11 +21,13 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.apache.pulsar.common.util.SimpleTextOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 /**
@@ -35,8 +37,12 @@
 
     private static final Logger log = LoggerFactory.getLogger(FunctionsStatsGenerator.class);
 
+    private static Set<String> METRIC_TYPES = new ConcurrentHashSet<>();
+
     public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
         if (workerService != null) {
+            METRIC_TYPES.forEach(metric -> metricType(out, metric));
+
             Map<String, FunctionRuntimeInfo> functionRuntimes
                     = workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();
 
@@ -86,10 +92,19 @@ public static void generate(WorkerService workerService, String cluster, SimpleT
         }
     }
 
+    private static void metricType(SimpleTextOutputStream stream, String name) {
+        stream.write("# TYPE ").write(name).write(" gauge\n");
+    }
+
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace,
                                String functionName, String metricName, int instanceId, double value) {
-        stream.write(metricName).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
-                .write("\", name=\"").write(functionName).write("\", instanceId=\"").write(instanceId).write("\"} ");
+        if (!METRIC_TYPES.contains(metricName)) {
+            metricType(stream, metricName);
+            METRIC_TYPES.add(metricName);
+        }
+
+        stream.write(metricName).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
+                .write("\",name=\"").write(functionName).write("\",instanceId=\"").write(instanceId).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
index d320514ed5..98168229bf 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionStatsGeneratorTest.java
@@ -168,7 +168,7 @@ public void testFunctionsStatsGenerate() {
         Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
 
         Arrays.asList(metrics.split("\n")).forEach(line -> {
-            if (line.isEmpty()) {
+            if (line.isEmpty() || line.startsWith("#")) {
                 return;
             }
             Matcher matcher = pattern.matcher(line);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services