You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/09/09 07:50:57 UTC

[hadoop] branch trunk updated: HADOOP-17804. Expose prometheus metrics only after a flush and dedupe with tag values (#3369)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4ced012  HADOOP-17804. Expose prometheus metrics only after a flush and dedupe with tag values (#3369)
4ced012 is described below

commit 4ced012f3301d0848680fdf0ef2972da9b3e1298
Author: Adam Binford <ad...@gmail.com>
AuthorDate: Thu Sep 9 03:49:40 2021 -0400

    HADOOP-17804. Expose prometheus metrics only after a flush and dedupe with tag values (#3369)
    
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
---
 .../metrics2/sink/PrometheusMetricsSink.java       |  94 ++++++++++-------
 .../metrics2/sink/TestPrometheusMetricsSink.java   | 114 ++++++++++++++++++++-
 2 files changed, 168 insertions(+), 40 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java
index 10df769..db2ae85 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/PrometheusMetricsSink.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.metrics2.MetricsTag;
 
 import java.io.IOException;
 import java.io.Writer;
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
@@ -42,7 +43,10 @@ public class PrometheusMetricsSink implements MetricsSink {
   /**
    * Cached output lines for each metrics.
    */
-  private final Map<String, String> metricLines = new ConcurrentHashMap<>();
+  private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetrics =
+      new ConcurrentHashMap<>();
+  private Map<String, Map<Collection<MetricsTag>, AbstractMetric>> nextPromMetrics =
+      new ConcurrentHashMap<>();
 
   private static final Pattern SPLIT_PATTERN =
       Pattern.compile("(?<!(^|[A-Z_]))(?=[A-Z])|(?<!^)(?=[A-Z][a-z])");
@@ -53,42 +57,16 @@ public class PrometheusMetricsSink implements MetricsSink {
 
   @Override
   public void putMetrics(MetricsRecord metricsRecord) {
-    for (AbstractMetric metrics : metricsRecord.metrics()) {
-      if (metrics.type() == MetricType.COUNTER
-          || metrics.type() == MetricType.GAUGE) {
+    for (AbstractMetric metric : metricsRecord.metrics()) {
+      if (metric.type() == MetricType.COUNTER
+          || metric.type() == MetricType.GAUGE) {
 
         String key = prometheusName(
-            metricsRecord.name(), metrics.name());
-
-        StringBuilder builder = new StringBuilder();
-        builder.append("# TYPE ")
-            .append(key)
-            .append(" ")
-            .append(metrics.type().toString().toLowerCase())
-            .append("\n")
-            .append(key)
-            .append("{");
-        String sep = "";
-
-        //add tags
-        for (MetricsTag tag : metricsRecord.tags()) {
-          String tagName = tag.name().toLowerCase();
-
-          //ignore specific tag which includes sub-hierarchy
-          if (!tagName.equals("numopenconnectionsperuser")) {
-            builder.append(sep)
-                .append(tagName)
-                .append("=\"")
-                .append(tag.value())
-                .append("\"");
-            sep = ",";
-          }
-        }
-        builder.append("} ");
-        builder.append(metrics.value());
-        builder.append("\n");
-        metricLines.put(key, builder.toString());
+            metricsRecord.name(), metric.name());
 
+        nextPromMetrics.computeIfAbsent(key,
+            any -> new ConcurrentHashMap<>())
+            .put(metricsRecord.tags(), metric);
       }
     }
   }
@@ -108,17 +86,55 @@ public class PrometheusMetricsSink implements MetricsSink {
 
   @Override
   public void flush() {
-
+    promMetrics = nextPromMetrics;
+    nextPromMetrics = new ConcurrentHashMap<>();
   }
 
   @Override
-  public void init(SubsetConfiguration subsetConfiguration) {
-
+  public void init(SubsetConfiguration conf) {
   }
 
   public void writeMetrics(Writer writer) throws IOException {
-    for (String line : metricLines.values()) {
-      writer.write(line);
+    for (Map.Entry<String, Map<Collection<MetricsTag>, AbstractMetric>> promMetric :
+        promMetrics.entrySet()) {
+      AbstractMetric firstMetric = promMetric.getValue().values().iterator().next();
+
+      StringBuilder builder = new StringBuilder();
+      builder.append("# HELP ")
+          .append(promMetric.getKey())
+          .append(" ")
+          .append(firstMetric.description())
+          .append("\n")
+          .append("# TYPE ")
+          .append(promMetric.getKey())
+          .append(" ")
+          .append(firstMetric.type().toString().toLowerCase())
+          .append("\n");
+
+      for (Map.Entry<Collection<MetricsTag>, AbstractMetric> metric :
+          promMetric.getValue().entrySet()) {
+        builder.append(promMetric.getKey())
+            .append("{");
+
+        String sep = "";
+        for (MetricsTag tag : metric.getKey()) {
+          String tagName = tag.name().toLowerCase();
+
+          if (!tagName.equals("numopenconnectionsperuser")) {
+            builder.append(sep)
+                .append(tagName)
+                .append("=\"")
+                .append(tag.value())
+                .append("\"");
+            sep = ",";
+          }
+        }
+        builder.append("} ");
+        builder.append(metric.getValue().value());
+        builder.append("\n");
+      }
+
+      writer.write(builder.toString());
     }
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java
index 3fc4aa4..df13191 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/sink/TestPrometheusMetricsSink.java
@@ -24,6 +24,7 @@ import java.io.OutputStreamWriter;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.annotation.Metric.Type;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 
@@ -48,7 +49,6 @@ public class TestPrometheusMetricsSink {
     TestMetrics testMetrics = metrics
         .register("TestMetrics", "Testing metrics", new TestMetrics());
 
-    metrics.start();
     testMetrics.numBucketCreateFails.incr();
     metrics.publishMetricsNow();
     ByteArrayOutputStream stream = new ByteArrayOutputStream();
@@ -67,6 +67,104 @@ public class TestPrometheusMetricsSink {
             "test_metrics_num_bucket_create_fails{context=\"dfs\"")
     );
 
+    metrics.unregisterSource("TestMetrics");
+    metrics.stop();
+    metrics.shutdown();
+  }
+
+  /**
+   * Fix for HADOOP-17804, make sure Prometheus metrics get deduped based on metric
+   * and tags, not just the metric.
+   */
+  @Test
+  public void testPublishMultiple() throws IOException {
+    //GIVEN
+    MetricsSystem metrics = DefaultMetricsSystem.instance();
+
+    metrics.init("test");
+    PrometheusMetricsSink sink = new PrometheusMetricsSink();
+    metrics.register("Prometheus", "Prometheus", sink);
+    TestMetrics testMetrics1 = metrics
+        .register("TestMetrics1", "Testing metrics", new TestMetrics("1"));
+    TestMetrics testMetrics2 = metrics
+        .register("TestMetrics2", "Testing metrics", new TestMetrics("2"));
+
+    testMetrics1.numBucketCreateFails.incr();
+    testMetrics2.numBucketCreateFails.incr();
+    metrics.publishMetricsNow();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
+
+    //WHEN
+    sink.writeMetrics(writer);
+    writer.flush();
+
+    //THEN
+    String writtenMetrics = stream.toString(UTF_8.name());
+    System.out.println(writtenMetrics);
+    Assert.assertTrue(
+        "The expected first metric line is missing from prometheus metrics output",
+        writtenMetrics.contains(
+            "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
+    );
+    Assert.assertTrue(
+        "The expected second metric line is missing from prometheus metrics output",
+        writtenMetrics.contains(
+            "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
+    );
+
+    metrics.unregisterSource("TestMetrics1");
+    metrics.unregisterSource("TestMetrics2");
+    metrics.stop();
+    metrics.shutdown();
+  }
+
+  /**
+   * Fix for HADOOP-17804, make sure Prometheus metrics start fresh after each flush.
+   */
+  @Test
+  public void testPublishFlush() throws IOException {
+    //GIVEN
+    MetricsSystem metrics = DefaultMetricsSystem.instance();
+
+    metrics.init("test");
+    PrometheusMetricsSink sink = new PrometheusMetricsSink();
+    metrics.register("Prometheus", "Prometheus", sink);
+    TestMetrics testMetrics = metrics
+        .register("TestMetrics", "Testing metrics", new TestMetrics("1"));
+
+    testMetrics.numBucketCreateFails.incr();
+    metrics.publishMetricsNow();
+
+    metrics.unregisterSource("TestMetrics");
+    testMetrics = metrics
+        .register("TestMetrics", "Testing metrics", new TestMetrics("2"));
+
+    testMetrics.numBucketCreateFails.incr();
+    metrics.publishMetricsNow();
+
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    OutputStreamWriter writer = new OutputStreamWriter(stream, UTF_8);
+
+    //WHEN
+    sink.writeMetrics(writer);
+    writer.flush();
+
+    //THEN
+    String writtenMetrics = stream.toString(UTF_8.name());
+    System.out.println(writtenMetrics);
+    Assert.assertFalse(
+        "The first metric should not exist after flushing",
+        writtenMetrics.contains(
+            "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue1\"")
+    );
+    Assert.assertTrue(
+        "The expected metric line is missing from prometheus metrics output",
+        writtenMetrics.contains(
+            "test_metrics_num_bucket_create_fails{context=\"dfs\",testtag=\"testTagValue2\"")
+    );
+
+    metrics.unregisterSource("TestMetrics");
     metrics.stop();
     metrics.shutdown();
   }
@@ -126,6 +224,20 @@ public class TestPrometheusMetricsSink {
    */
   @Metrics(about = "Test Metrics", context = "dfs")
   private static class TestMetrics {
+    private String id;
+
+    TestMetrics() {
+      this("1");
+    }
+
+    TestMetrics(String id) {
+      this.id = id;
+    }
+
+    @Metric(value={"testTag", ""}, type=Type.TAG)
+    String testTag1() {
+      return "testTagValue" + id;
+    }
 
     @Metric
     private MutableCounterLong numBucketCreateFails;

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org