You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/08/26 06:36:06 UTC

[pulsar] branch master updated: Adding back Prometheus TYPE definitions; fixed duplicate TYPE errors; fixed format issue in metricWithRemoteCluster; added test for Prometheus types (#4183)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d32b58  Adding back Prometheus TYPE definitions; fixed duplicate TYPE errors; fixed format issue in metricWithRemoteCluster; added test for Prometheus types (#4183)
8d32b58 is described below

commit 8d32b588e2ccba31fbd41e7d508400de20a71405
Author: Chris Bartholomew <ch...@kafkaesque.io>
AuthorDate: Mon Aug 26 02:36:01 2019 -0400

    Adding back Prometheus TYPE definitions; fixed duplicate TYPE errors; fixed format issue in metricWithRemoteCluster; added test for Prometheus types (#4183)
    
    Fixes #3112
    
    ### Motivation
    
    In this pull request, I set out to fix #3112, which is caused by duplicate TYPE statements in the metrics output which leads to parsing of Prometheus metrics to fail in recent versions of Prometheus. Because of this, Prometheus will report the broker target as down.
    
    Since I started looking at this, the type definitions have been removed (#4136) from topics metric output. I think these types are useful in Prometheus and have added them back in.
    
    While testing this fix in my geo-replicated setup, I found a format in error (missing quote and comma) in the TopicStats.metricWithRemoteCluster method. This pull request includes a fix for that issue.
    
    I have also added a new test to PrometheusMetricsTest.java that fails without these changes but passed with them.
    
    ### Modifications
    
    I added a static HashMap to TopicStats to keep track of the TYPEs that have been output. All writing of the TYPE for topics and namespaces is done with the TopicStats.metricType method. I modified that method to update the HashMap and only print the TYPE out for the first occurrence of the metric name.  I also added a method reset the HashMap, which gets called in NamespaceStatsAggregator.generate.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
      - Added testDuplicateMetricTypeDefinitions which checks for:
           - duplicate TYPE definitions in the Prometheus output
           - validates that no TYPE definition appears after the first metric sample
           - ensures that all metrics have a defined type
    
    I execute the test twice to make sure the resetting of the HashMap of the already seen metric type definitions works correctly. This test passes for me reliably (both occurrences).
    
    I have confirmed using promtool that the metrics output will now parse without error using versions 2.7.1 and 2.9.2 (which is the latest). There are many warnings around missing HELP definitions and metrics using reserved suffixes (ex. _count), but no errors.
    
    In addition, I have patched 2.3.1 with this fix and am currently running it in my cluster. Prometheus (2.7.1) successfully parses the metrics and I am able to see namespace and topic-level metrics.
---
 .../stats/prometheus/NamespaceStatsAggregator.java |  5 ++
 .../pulsar/broker/stats/prometheus/TopicStats.java | 23 +++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 97 ++++++++++++++++++++++
 3 files changed, 125 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
index b1dfbbd..93d56b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java
@@ -49,6 +49,7 @@ public class NamespaceStatsAggregator {
     public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics, SimpleTextOutputStream stream) {
         String cluster = pulsar.getConfiguration().getClusterName();
         AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
+        TopicStats.resetTypes();
         TopicStats topicStats = localTopicStats.get();
 
         printDefaultBrokerStats(stream, cluster);
@@ -260,6 +261,7 @@ public class NamespaceStatsAggregator {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String name,
             long value) {
+        TopicStats.metricType(stream, name);
         stream.write(name)
                 .write("{cluster=\"").write(cluster).write("\"} ")
                 .write(value).write(' ').write(System.currentTimeMillis())
@@ -268,18 +270,21 @@ public class NamespaceStatsAggregator {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                long value) {
+        TopicStats.metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
                                double value) {
+        TopicStats.metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
     }
 
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
                                                 String name, String remoteCluster, double value) {
+        TopicStats.metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
         stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
index 17ab714..66bb70a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TopicStats.java
@@ -51,6 +51,10 @@ class TopicStats {
     Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
     Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
 
+    // Used for tracking duplicate TYPE definitions
+    static Map<String, String> metricWithTypeDefinition = new HashMap<>();
+
+
     public void reset() {
         subscriptionsCount = 0;
         producersCount = 0;
@@ -74,6 +78,10 @@ class TopicStats {
         entrySizeBuckets.reset();
     }
 
+    static void resetTypes() {
+        metricWithTypeDefinition.clear();
+    }
+
     static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
                                 TopicStats stats) {
         metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount);
@@ -156,8 +164,18 @@ class TopicStats {
         }
     }
 
+    static void metricType(SimpleTextOutputStream stream, String name) {
+
+        if (!metricWithTypeDefinition.containsKey(name)) {
+            metricWithTypeDefinition.put(name, "gauge");
+            stream.write("# TYPE ").write(name).write(" gauge\n");
+        }
+
+    }
+
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
                                String name, double value) {
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
@@ -165,6 +183,7 @@ class TopicStats {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String name, long value) {
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
@@ -172,6 +191,7 @@ class TopicStats {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String name, double value) {
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
@@ -179,6 +199,7 @@ class TopicStats {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String consumerName, long consumerId, String name, long value) {
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
@@ -187,6 +208,7 @@ class TopicStats {
 
     private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic, String subscription,
                                String consumerName, long consumerId, String name, double value) {
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace)
                 .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
                 .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId).write("\"} ");
@@ -196,6 +218,7 @@ class TopicStats {
     private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
             String topic,
             String name, String remoteCluster, double value) {
+        metricType(stream, name);
         stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
         stream.write("\",topic=\"").write(topic).write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
         stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index ac8e802..054450b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.broker.stats;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 
 import java.io.ByteArrayOutputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -136,6 +138,101 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         p2.close();
     }
 
+    /** Checks for duplicate type definitions for a metric in the Prometheus metrics output. If the Prometheus parser
+     finds a TYPE definition for the same metric more than once, it errors out:
+     https://github.com/prometheus/prometheus/blob/f04b1b5559a80a4fd1745cf891ce392a056460c9/vendor/github.com/prometheus/common/expfmt/text_parse.go#L499-L502
+     This can happen when including topic metrics, since the same metric is reported multiple times with different labels. For example:
+
+     # TYPE pulsar_subscriptions_count gauge
+     pulsar_subscriptions_count{cluster="standalone"} 0 1556372982118
+     pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/metadata"} 1.0 1556372982118
+     pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/coordinate"} 1.0 1556372982118
+     pulsar_subscriptions_count{cluster="standalone",namespace="public/functions",topic="persistent://public/functions/assignments"} 1.0 1556372982118
+
+     **/
+    // Running the test twice to make sure types are present when generated multiple times
+    @Test(invocationCount = 2)
+    public void testDuplicateMetricTypeDefinitions() throws Exception {
+        Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
+        Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            p1.send(message.getBytes());
+            p2.send(message.getBytes());
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+
+        Map<String, String> typeDefs = new HashMap<String, String>();
+        Map<String, String> metricNames = new HashMap<String, String>();
+
+        Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
+        Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");
+
+        Splitter.on("\n").split(metricsStr).forEach(line -> {
+            if (line.isEmpty()) {
+                return;
+            }
+            if (line.startsWith("#")) {
+                // Check for duplicate type definitions
+                Matcher typeMatcher = typePattern.matcher(line);
+                checkArgument(typeMatcher.matches());
+                String metricName = typeMatcher.group(1);
+                String type = typeMatcher.group(2);
+
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "Only one TYPE line may exist for a given metric name."
+                if (!typeDefs.containsKey(metricName)) {
+                    typeDefs.put(metricName, type);
+                } else {
+                    fail("Duplicate type definition found for TYPE definition " + metricName);
+                    System.out.println(metricsStr);
+
+                }
+                // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
+                // "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
+                if (metricNames.containsKey(metricName)) {
+                    System.out.println(metricsStr);
+                    fail("TYPE definition for " + metricName + " appears after first sample");
+
+                }
+            } else {
+                Matcher metricMatcher = metricNamePattern.matcher(line);
+                checkArgument(metricMatcher.matches());
+                String metricName = metricMatcher.group(1);
+                metricNames.put(metricName, metricName);
+            }
+        });
+
+        // Metrics with no type definition
+        for (String metricName : metricNames.keySet()) {
+
+            if (!typeDefs.containsKey(metricName)) {
+                // This may be OK if this is a _sum or _count metric from a summary
+                if(metricName.endsWith("_sum")) {
+                    String summaryMetricName = metricName.substring(0, metricName.indexOf("_sum"));
+                    if (!typeDefs.containsKey(summaryMetricName)) {
+                        fail("Metric " + metricName + " does not have a corresponding summary type definition");
+                    }
+                } else if (metricName.endsWith("_count")) {
+                    String summaryMetricName = metricName.substring(0, metricName.indexOf("_count"));
+                    if (!typeDefs.containsKey(summaryMetricName)) {
+                        fail("Metric " + metricName + " does not have a corresponding summary type definition");
+                    }
+                } else {
+                    fail("Metric " + metricName + " does not have a type definition");
+                }
+
+            }
+        }
+
+        p1.close();
+        p2.close();
+    }
+
+
     /**
      * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
      */