You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2020/04/02 05:14:38 UTC

[incubator-heron] branch master updated: Support custom metrics rules for PrometheusSink (#3493)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new b069e45  Support custom metrics rules for PrometheusSink (#3493)
b069e45 is described below

commit b069e450a78c2f77ba5427c3f1aceda1e881e9f2
Author: choi se <th...@gmail.com>
AuthorDate: Thu Apr 2 14:14:28 2020 +0900

    Support custom metrics rules for PrometheusSink (#3493)
    
    * Support Java 11
    
    * config travis to use oracle jdk 11
    
    * Java 11 support (#3399)
    
    * Support Java 11
    
    * config travis to use oracle jdk 11
    
    * Add check jdk version
    
    * Fix command arguments.
    
    Change insert gc_options
    
    Update list
    
    Fix gc-logging
    
    * Add missing parameter
    
    * typo
    
    * Add pause time
    
    * wip
    
    * Support jmx_exporter format configuration.
    
    * Fix checkstyle
    
    * Remove unused
    
    * Java 11 support (#3399)
    
    * Support Java 11
    
    * config travis to use oracle jdk 11
    
    * Add check jdk version
    
    * Fix command arguments.
    
    Change insert gc_options
    
    Update list
    
    Fix gc-logging
    
    * wip
    
    * Support jmx_exporter format configuration.
    
    * Fix checkstyle
    
    * Remove unused
    
    * Update kafkaOffset metrics
    
    * Add Rules
    
    * Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459)
    
    * Patch to fix cppcheck with newer glibc (#3471)
    
    * Add documents for setting up a docker based development environment (#3475)
    
    * Improve concurrency for needed parts. (#3107)
    
    * Change concurrent Map
    
    * Change concurrent Map
    
    * HashMap changes for unneeded parts.
    
    * HashMap changes for unneeded parts.
    
    * Review changes
    
    * Changes HashMap for unneeded parts.
    
    * Improve concurrency for needed parts.
    
    * Remove unused imports.
    
    * Remove unused imports.
    
    * Remove unused imports.
    
    * Fix NPE
    
    (cherry picked from commit 545d3814b315c29d3e396309a2ededaad193ec32)
    
    * Fix WhitespaceAround
    
    * Add dummy Object
    
    * Fix ConstantName
    
    (cherry picked from commit 8d6d5067072e92d6e276f93e18297ddedc647c6c)
    
    * Update kafkaOffset metrics
    
    * Add Rules
    
    * Update line is longer than 100 characters
    
    * Update line is longer than 100 characters
    
    * Add attrNameSnakeCase or other metrics fix
    
    * fix checkstyle
    
    Co-authored-by: Ning Wang <wa...@gmail.com>
    Co-authored-by: Ning Wang <nw...@twitter.com>
    Co-authored-by: Nicholas Nezis <ni...@gmail.com>
---
 .../heron/metricsmgr/sink/PrometheusSink.java      | 236 ++++++++++++++++++---
 .../heron/metricsmgr/sink/PrometheusSinkTests.java | 124 ++++++++++-
 2 files changed, 324 insertions(+), 36 deletions(-)

diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java
index 0db442a..56a244b 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java
@@ -20,10 +20,17 @@
 package org.apache.heron.metricsmgr.sink;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.cache.Cache;
@@ -33,6 +40,9 @@ import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo;
 import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
 import org.apache.heron.spi.metricsmgr.sink.SinkContext;
 
+import static java.lang.String.format;
+import static org.apache.heron.metricsmgr.sink.PrometheusSink.Prometheus.sanitizeMetricName;
+
 /**
  * A web sink that exposes and endpoint that Prometheus can scrape
  *
@@ -57,6 +67,7 @@ public class PrometheusSink extends AbstractWebSink {
 
   // This is the cache that is used to serve the metrics
   private Cache<String, Map<String, Double>> metricsCache;
+  private List<Rule> rules = new ArrayList<Rule>();
 
   private String cluster;
   private String role;
@@ -66,6 +77,26 @@ public class PrometheusSink extends AbstractWebSink {
     super();
   }
 
+  private enum Type {
+    COUNTER,
+    GAUGE,
+    SUMMARY,
+    HISTOGRAM,
+    UNTYPED,
+  }
+
+  private static class Rule {
+    public Pattern pattern;
+    public String name;
+    public String value;
+    public Double valueFactor = 1.0;
+    public String help;
+    public boolean attrNameSnakeCase;
+    public Type type = Type.UNTYPED;
+    public ArrayList<String> labelNames;
+    public ArrayList<String> labelValues;
+  }
+
   @Override
   void initialize(Map<String, Object> configuration, SinkContext context) {
     metricsCache = createCache();
@@ -73,6 +104,64 @@ public class PrometheusSink extends AbstractWebSink {
     cluster = context.getCluster();
     role = context.getRole();
     environment = context.getEnvironment();
+
+    if (configuration.containsKey("rules")) {
+      List<Map<String, Object>> configRules = (List<Map<String, Object>>)
+          configuration.get("rules");
+      for (Map<String, Object> ruleObject : configRules) {
+        Rule rule = new Rule();
+        rules.add(rule);
+        if (ruleObject.containsKey("pattern")) {
+          rule.pattern = Pattern.compile("^.*(?:" + (String) ruleObject.get("pattern") + ").*$");
+        }
+        if (ruleObject.containsKey("name")) {
+          rule.name = (String) ruleObject.get("name");
+        }
+        if (ruleObject.containsKey("value")) {
+          rule.value = String.valueOf(ruleObject.get("value"));
+        }
+        if (ruleObject.containsKey("valueFactor")) {
+          String valueFactor = String.valueOf(ruleObject.get("valueFactor"));
+          try {
+            rule.valueFactor = Double.valueOf(valueFactor);
+          } catch (NumberFormatException e) {
+            // use default value
+          }
+        }
+        if (ruleObject.containsKey("attrNameSnakeCase")) {
+          rule.attrNameSnakeCase = (Boolean) ruleObject.get("attrNameSnakeCase");
+        }
+        if (ruleObject.containsKey("type")) {
+          rule.type = Type.valueOf((String) ruleObject.get("type"));
+        }
+        if (ruleObject.containsKey("help")) {
+          rule.help = (String) ruleObject.get("help");
+        }
+        if (ruleObject.containsKey("labels")) {
+          TreeMap labels = new TreeMap((Map<String, Object>) ruleObject.get("labels"));
+          rule.labelNames = new ArrayList<String>();
+          rule.labelValues = new ArrayList<String>();
+          for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) labels
+              .entrySet()) {
+            rule.labelNames.add(entry.getKey());
+            rule.labelValues.add((String) entry.getValue());
+          }
+        }
+
+        // Validation.
+        if ((rule.labelNames != null || rule.help != null) && rule.name == null) {
+          throw new IllegalArgumentException("Must provide name, if help or labels are given: "
+              + ruleObject);
+        }
+        if (rule.name != null && rule.pattern == null) {
+          throw new IllegalArgumentException("Must provide pattern, if name is given: "
+              + ruleObject);
+        }
+      }
+    } else {
+      // Default to a single default rule.
+      rules.add(new Rule());
+    }
   }
 
   @Override
@@ -82,6 +171,9 @@ public class PrometheusSink extends AbstractWebSink {
     final StringBuilder sb = new StringBuilder();
 
     metrics.forEach((String source, Map<String, Double> sourceMetrics) -> {
+      // Map the labels.
+      final Map<String, String> labelKV = new TreeMap<String, String>();
+
       String[] sources = source.split("/");
       String topology = sources[0];
       String component = sources[1];
@@ -96,6 +188,18 @@ public class PrometheusSink extends AbstractWebSink {
       final String clusterRoleEnv = hasClusterRoleEnvironment(c, r, e)
           ? String.format("%s/%s/%s", c, r, e) : null;
 
+      labelKV.put("topology", topology);
+      labelKV.put("component", component);
+      labelKV.put("instance_id", instance);
+
+      if (clusterRoleEnv != null) {
+        labelKV.put("cluster_role_env", clusterRoleEnv);
+      }
+
+      if (componentType != null) {
+        labelKV.put("component_type", componentType);
+      }
+
       sourceMetrics.forEach((String metric, Double value) -> {
 
         // some stream manager metrics in heron contain a instance id as part of the metric name
@@ -104,46 +208,79 @@ public class PrometheusSink extends AbstractWebSink {
         // __time_spent_back_pressure_by_compid/container_1_exclaim1_1
         // TODO convert to small classes for less string manipulation
         final String metricName;
-        final String metricInstanceId;
         if (componentIsStreamManger) {
           final boolean metricHasInstanceId = metric.contains("_by_");
           final String[] metricParts = metric.split("/");
           if (metricHasInstanceId && metricParts.length == 3) {
-            metricName = String.format("%s_%s", metricParts[0], metricParts[2]);
-            metricInstanceId = metricParts[1];
+            metricName = splitTargetInstance(metricParts[0], metricParts[2], labelKV);
+            labelKV.put("metric_instance_id", metricParts[1]);
           } else if (metricHasInstanceId && metricParts.length == 2) {
-            metricName = metricParts[0];
-            metricInstanceId = metricParts[1];
+            metricName = splitTargetInstance(metricParts[0], null, labelKV);
+            labelKV.put("metric_instance_id", metricParts[1]);
+          } else if (metricParts.length == 2) {
+            metricName = splitTargetInstance(metricParts[0], metricParts[1], labelKV);
           } else {
-            metricName = metric;
-            metricInstanceId = null;
+            metricName = splitTargetInstance(metric, null, labelKV);
           }
-
         } else {
-          metricName = metric;
-          metricInstanceId = null;
-        }
-
-        String exportedMetricName = String.format("%s_%s", HERON_PREFIX,
-            metricName.replace("__", "").toLowerCase());
-        sb.append(Prometheus.sanitizeMetricName(exportedMetricName))
-            .append("{")
-            .append("topology=\"").append(topology).append("\",")
-            .append("component=\"").append(component).append("\",")
-            .append("instance_id=\"").append(instance).append("\"");
-
-        if (clusterRoleEnv != null) {
-          sb.append(",cluster_role_env=\"").append(clusterRoleEnv).append("\"");
-        }
-
-        if (componentType != null) {
-          sb.append(",component_type=\"").append(componentType).append("\"");
-        }
-
-        if (metricInstanceId != null) {
-          sb.append(",metric_instance_id=\"").append(metricInstanceId).append("\"");
+          final AtomicReference<String> name = new AtomicReference<>(sanitizeMetricName(metric));
+          rules.forEach(rule -> {
+            String ruleName = name.get();
+            Matcher matcher = null;
+            if (rule.pattern != null) {
+              matcher = rule.pattern.matcher(metric);
+              if (!matcher.matches()) {
+                return;
+              }
+            }
+
+            // If there's no name provided, use default export format.
+            if (rule.name == null || rule.name.isEmpty()) {
+              // nothing
+            } else {
+              // Matcher is set below here due to validation in the constructor.
+              ruleName = sanitizeMetricName(matcher.replaceAll(rule.name));
+              if (ruleName.isEmpty()) {
+                return;
+              }
+            }
+            if (rule.attrNameSnakeCase) {
+              name.set(toSnakeAndLowerCase(ruleName));
+            } else {
+              name.set(ruleName.toLowerCase());
+            }
+            if (rule.labelNames != null) {
+              for (int i = 0; i < rule.labelNames.size(); i++) {
+                final String unsafeLabelName = rule.labelNames.get(i);
+                final String labelValReplacement = rule.labelValues.get(i);
+                String labelName = sanitizeMetricName(matcher.replaceAll(unsafeLabelName));
+                String labelValue = matcher.replaceAll(labelValReplacement);
+                labelName = labelName.toLowerCase();
+                if (!labelName.isEmpty() && !labelValue.isEmpty()) {
+                  labelKV.put(labelName, labelValue);
+                }
+              }
+            }
+          });
+          metricName = name.get();
         }
 
+        // TODO Type, Help
+        String exportedMetricName = format("%s_%s", HERON_PREFIX,
+            metricName
+                .replace("__", "")
+                .toLowerCase());
+        sb.append(sanitizeMetricName(exportedMetricName))
+            .append("{");
+        final AtomicBoolean isFirst = new AtomicBoolean(true);
+        labelKV.forEach((k, v) -> {
+          // Add labels
+          if (!isFirst.get()) {
+            sb.append(',');
+          }
+          sb.append(format("%s=\"%s\"", k, v));
+          isFirst.set(false);
+        });
         sb.append("} ")
             .append(Prometheus.doubleToGoString(value))
             .append(" ").append(currentTimeMillis())
@@ -154,6 +291,45 @@ public class PrometheusSink extends AbstractWebSink {
     return sb.toString().getBytes();
   }
 
+  private static final Pattern SPLIT_TARGET = Pattern.compile("__(?<name>\\w+)"
+      + "_(?<target>(?<instance>\\w+)-\\d+)");
+  private static final Pattern DIGIT = Pattern.compile("[0-9]+");
+
+  private String splitTargetInstance(String part1, String part2, Map<String, String> labelKV) {
+    if (part2 != null) {
+      if (DIGIT.matcher(part2).matches()) {
+        labelKV.put("metric_instance_id", part2);
+        return part1;
+      }
+      final Matcher m = SPLIT_TARGET.matcher(part1);
+      if (m.matches()) {
+        labelKV.put("metric_instance_id", m.group("target"));
+        return String.format("%s_%s_%s", m.group("name"), m.group("instance"), part2);
+      }
+      return String.format("%s_%s", part1, part2);
+    }
+    return part1;
+  }
+
+  static String toSnakeAndLowerCase(String attrName) {
+    if (attrName == null || attrName.isEmpty()) {
+      return attrName;
+    }
+    char firstChar = attrName.subSequence(0, 1).charAt(0);
+    boolean prevCharIsUpperCaseOrUnderscore = Character.isUpperCase(firstChar) || firstChar == '_';
+    StringBuilder resultBuilder = new StringBuilder(attrName.length())
+        .append(Character.toLowerCase(firstChar));
+    for (char attrChar : attrName.substring(1).toCharArray()) {
+      boolean charIsUpperCase = Character.isUpperCase(attrChar);
+      if (!prevCharIsUpperCaseOrUnderscore && charIsUpperCase) {
+        resultBuilder.append("_");
+      }
+      resultBuilder.append(Character.toLowerCase(attrChar));
+      prevCharIsUpperCaseOrUnderscore = charIsUpperCase || attrChar == '_';
+    }
+    return resultBuilder.toString();
+  }
+
   @Override
   public void processRecord(MetricsRecord record) {
     final String[] sources = MetricsUtil.splitRecordSource(record);
diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java
index 318d8ec..b73b1dd 100644
--- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java
+++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java
@@ -28,6 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -81,6 +84,45 @@ public class PrometheusSinkTests {
     Mockito.when(context.getTopologyName()).thenReturn("testTopology");
     Mockito.when(context.getSinkId()).thenReturn("testId");
 
+    /*
+    # example: metrics.yaml
+    rules:
+      - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
+        name: kafka_$1_$2_$3_total
+        attrNameSnakeCase: true
+        type: COUNTER
+        labels:
+          "$4": "$5"
+        type: COUNTER
+    */
+    /*
+    example: metrics
+      kafkaOffset/nginx-lfp-beacon/totalSpoutLag
+      kafkaOffset/lads_event_meta_backfill_data/partition_10/spoutLag
+     */
+    List<Map<String, Object>> rules = Lists.newArrayList();
+    defaultConf.put("rules", rules);
+    Map<String, Object> rule1 = Maps.newHashMap();
+    Map<String, Object> labels1 = Maps.newHashMap();
+    rules.add(rule1);
+    rule1.put("pattern", "kafkaOffset/(.+)/(.+)");
+    rule1.put("name", "kafka_offset_$2");
+    rule1.put("type", "COUNTER");
+    rule1.put("attrNameSnakeCase", true);
+    rule1.put("labels", labels1);
+    labels1.put("topic", "$1");
+
+    Map<String, Object> rule2 = Maps.newHashMap();
+    Map<String, Object> labels2 = Maps.newHashMap();
+    rules.add(rule2);
+    rule2.put("pattern", "kafkaOffset/(.+)/partition_(\\d+)/(.+)");
+    rule2.put("name", "kafka_offset_partition_$3");
+    rule2.put("type", "COUNTER");
+    rule2.put("labels", labels2);
+    rule2.put("attrNameSnakeCase", true);
+    labels2.put("topic", "$1");
+    labels2.put("partition", "$2");
+
     Iterable<MetricsInfo> infos = Arrays.asList(new MetricsInfo("metric_1", "1.0"),
         new MetricsInfo("metric_2", "2.0"));
 
@@ -133,7 +175,9 @@ public class PrometheusSinkTests {
   public void testResponseWhenMetricNamesHaveAnInstanceId() throws IOException {
     Iterable<MetricsInfo> infos = Arrays.asList(
         new MetricsInfo("__connection_buffer_by_instanceid/container_1_word_5/packets", "1.0"),
-        new MetricsInfo("__time_spent_back_pressure_by_compid/container_1_exclaim1_1", "1.0")
+        new MetricsInfo("__time_spent_back_pressure_by_compid/container_1_exclaim1_1", "1.0"),
+        new MetricsInfo("__client_stmgr-92/__ack_tuples_to_stmgrs", "1.0"),
+        new MetricsInfo("__instance_bytes_received/1", "1.0")
     );
 
     records = Arrays.asList(
@@ -154,7 +198,59 @@ public class PrometheusSinkTests {
             "container_1_word_5", "1.0"),
         createMetric(topology, "__stmgr__", "stmgr-1",
             "time_spent_back_pressure_by_compid",
-            "container_1_exclaim1_1", "1.0")
+            "container_1_exclaim1_1", "1.0"),
+        createMetric(topology, "__stmgr__", "stmgr-1",
+            "client_stmgr_ack_tuples_to_stmgrs", "stmgr-92", "1.0"),
+        createMetric(topology, "__stmgr__", "stmgr-1",
+            "instance_bytes_received", "1", "1.0")
+    );
+
+    final Set<String> generatedLines =
+        new HashSet<>(Arrays.asList(new String(sink.generateResponse()).split("\n")));
+
+    assertEquals(expectedLines.size(), generatedLines.size());
+
+    expectedLines.forEach((String line) -> {
+      assertTrue(generatedLines.contains(line));
+    });
+  }
+
+  @Test
+  public void testApacheStormKafkaMetrics() throws IOException {
+    Iterable<MetricsInfo> infos = Arrays.asList(
+        new MetricsInfo("kafkaOffset/event_data/partition_0/spoutLag", "1.0"),
+        new MetricsInfo("kafkaOffset/event_data/partition_10/spoutLag", "1.0"),
+        new MetricsInfo("kafkaOffset/event_data/partition_0/earliestTimeOffset", "1.0"),
+        new MetricsInfo("kafkaOffset/event_data/totalRecordsInPartitions", "1.0"),
+        new MetricsInfo("kafkaOffset/event_data/totalSpoutLag", "1.0"),
+        new MetricsInfo("kafkaOffset/event_data/partition_2/spoutLag", "1.0")
+    );
+
+    records = Arrays.asList(
+        newRecord("shared-aurora-036:31/spout-release-1/container_1_spout-release-1_31",
+            infos, Collections.emptyList())
+    );
+    PrometheusTestSink sink = new PrometheusTestSink();
+    sink.init(defaultConf, context);
+    for (MetricsRecord r : records) {
+      sink.processRecord(r);
+    }
+
+    final String topology = "testTopology";
+
+    final List<String> expectedLines = Arrays.asList(
+        createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+            "kafka_offset_partition_spout_lag", "event_data", "0", "1.0"),
+        createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+            "kafka_offset_partition_spout_lag", "event_data", "10", "1.0"),
+        createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+            "kafka_offset_partition_earliest_time_offset", "event_data", "0", "1.0"),
+        createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+            "kafka_offset_total_records_in_partitions", "event_data", null, "1.0"),
+        createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+            "kafka_offset_total_spout_lag", "event_data", null, "1.0"),
+        createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+            "kafka_offset_partition_spout_lag", "event_data", "2", "1.0")
     );
 
     final Set<String> generatedLines =
@@ -195,12 +291,28 @@ public class PrometheusSinkTests {
 
     if (metricNameInstanceId != null) {
       return String.format("heron_%s"
-              + "{topology=\"%s\",component=\"%s\",instance_id=\"%s\",metric_instance_id=\"%s\"}"
+              + "{component=\"%s\",instance_id=\"%s\",metric_instance_id=\"%s\",topology=\"%s\"}"
               + " %s %d",
-          metric, topology, component, instance, metricNameInstanceId, value, NOW);
+          metric, component, instance, metricNameInstanceId, topology, value, NOW);
     } else {
-      return String.format("heron_%s{topology=\"%s\",component=\"%s\",instance_id=\"%s\"} %s %d",
-          metric, topology, component, instance, value, NOW);
+      return String.format("heron_%s{component=\"%s\",instance_id=\"%s\",topology=\"%s\"} %s %d",
+          metric, component, instance, topology, value, NOW);
+    }
+  }
+
+  private String createOffsetMetric(String topology, String component, String instance,
+                              String metric, String topic, String partition, String value) {
+
+    if (partition != null) {
+      return String.format("heron_%s"
+              + "{component=\"%s\",instance_id=\"%s\",partition=\"%s\","
+              + "topic=\"%s\",topology=\"%s\"}"
+              + " %s %d",
+          metric, component, instance, partition, topic, topology, value, NOW);
+    } else {
+      return String.format("heron_%s"
+              + "{component=\"%s\",instance_id=\"%s\",topic=\"%s\",topology=\"%s\"} %s %d",
+          metric, component, instance, topic, topology, value, NOW);
     }
   }