You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2020/04/02 05:14:38 UTC
[incubator-heron] branch master updated: Support custom metrics
rules for PrometheusSink (#3493)
This is an automated email from the ASF dual-hosted git repository.
nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new b069e45 Support custom metrics rules for PrometheusSink (#3493)
b069e45 is described below
commit b069e450a78c2f77ba5427c3f1aceda1e881e9f2
Author: choi se <th...@gmail.com>
AuthorDate: Thu Apr 2 14:14:28 2020 +0900
Support custom metrics rules for PrometheusSink (#3493)
* Support Java 11
* config travis to use oracle jdk 11
* Java 11 support (#3399)
* Support Java 11
* config travis to use oracle jdk 11
* Add check jdk version
* Fix command arguments.
Change insert gc_options
Update list
Fix gc-logging
* Add missing parameter
* typo
* Add pause time
* wip
* Support jmx_exporter format configuration.
* Fix checkstyle
* Remove unused
* Java 11 support (#3399)
* Support Java 11
* config travis to use oracle jdk 11
* Add check jdk version
* Fix command arguments.
Change insert gc_options
Update list
Fix gc-logging
* wip
* Support jmx_exporter format configuration.
* Fix checkstyle
* Remove unused
* Update kafkaOffset metrics
* Add Rules
* Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459)
* Patch to fix cppcheck with newer glibc (#3471)
* Add documents for setting up a docker based development environment (#3475)
* Improve concurrency for needed parts. (#3107)
* Change concurrent Map
* Change concurrent Map
* HashMap changes for unneeded parts.
* HashMap changes for unneeded parts.
* Review changes
* Changes HashMap for unneeded parts.
* Improve concurrency for needed parts.
* Remove unused imports.
* Remove unused imports.
* Remove unused imports.
* Fix NPE
(cherry picked from commit 545d3814b315c29d3e396309a2ededaad193ec32)
* Fix WhitespaceAround
* Add dummy Object
* Fix ConstantName
(cherry picked from commit 8d6d5067072e92d6e276f93e18297ddedc647c6c)
* Update kafkaOffset metrics
* Add Rules
* Update line is longer than 100 characters
* Update line is longer than 100 characters
* Add attrNameSnakeCase or other metrics fix
* fix checkstyle
Co-authored-by: Ning Wang <wa...@gmail.com>
Co-authored-by: Ning Wang <nw...@twitter.com>
Co-authored-by: Nicholas Nezis <ni...@gmail.com>
---
.../heron/metricsmgr/sink/PrometheusSink.java | 236 ++++++++++++++++++---
.../heron/metricsmgr/sink/PrometheusSinkTests.java | 124 ++++++++++-
2 files changed, 324 insertions(+), 36 deletions(-)
diff --git a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java
index 0db442a..56a244b 100644
--- a/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java
+++ b/heron/metricsmgr/src/java/org/apache/heron/metricsmgr/sink/PrometheusSink.java
@@ -20,10 +20,17 @@
package org.apache.heron.metricsmgr.sink;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.cache.Cache;
@@ -33,6 +40,9 @@ import org.apache.heron.spi.metricsmgr.metrics.MetricsInfo;
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;
+import static java.lang.String.format;
+import static org.apache.heron.metricsmgr.sink.PrometheusSink.Prometheus.sanitizeMetricName;
+
/**
* A web sink that exposes and endpoint that Prometheus can scrape
*
@@ -57,6 +67,7 @@ public class PrometheusSink extends AbstractWebSink {
// This is the cache that is used to serve the metrics
private Cache<String, Map<String, Double>> metricsCache;
+ private List<Rule> rules = new ArrayList<Rule>();
private String cluster;
private String role;
@@ -66,6 +77,26 @@ public class PrometheusSink extends AbstractWebSink {
super();
}
+ private enum Type {
+ COUNTER,
+ GAUGE,
+ SUMMARY,
+ HISTOGRAM,
+ UNTYPED,
+ }
+
+ private static class Rule {
+ public Pattern pattern;
+ public String name;
+ public String value;
+ public Double valueFactor = 1.0;
+ public String help;
+ public boolean attrNameSnakeCase;
+ public Type type = Type.UNTYPED;
+ public ArrayList<String> labelNames;
+ public ArrayList<String> labelValues;
+ }
+
@Override
void initialize(Map<String, Object> configuration, SinkContext context) {
metricsCache = createCache();
@@ -73,6 +104,64 @@ public class PrometheusSink extends AbstractWebSink {
cluster = context.getCluster();
role = context.getRole();
environment = context.getEnvironment();
+
+ if (configuration.containsKey("rules")) {
+ List<Map<String, Object>> configRules = (List<Map<String, Object>>)
+ configuration.get("rules");
+ for (Map<String, Object> ruleObject : configRules) {
+ Rule rule = new Rule();
+ rules.add(rule);
+ if (ruleObject.containsKey("pattern")) {
+ rule.pattern = Pattern.compile("^.*(?:" + (String) ruleObject.get("pattern") + ").*$");
+ }
+ if (ruleObject.containsKey("name")) {
+ rule.name = (String) ruleObject.get("name");
+ }
+ if (ruleObject.containsKey("value")) {
+ rule.value = String.valueOf(ruleObject.get("value"));
+ }
+ if (ruleObject.containsKey("valueFactor")) {
+ String valueFactor = String.valueOf(ruleObject.get("valueFactor"));
+ try {
+ rule.valueFactor = Double.valueOf(valueFactor);
+ } catch (NumberFormatException e) {
+ // use default value
+ }
+ }
+ if (ruleObject.containsKey("attrNameSnakeCase")) {
+ rule.attrNameSnakeCase = (Boolean) ruleObject.get("attrNameSnakeCase");
+ }
+ if (ruleObject.containsKey("type")) {
+ rule.type = Type.valueOf((String) ruleObject.get("type"));
+ }
+ if (ruleObject.containsKey("help")) {
+ rule.help = (String) ruleObject.get("help");
+ }
+ if (ruleObject.containsKey("labels")) {
+ TreeMap labels = new TreeMap((Map<String, Object>) ruleObject.get("labels"));
+ rule.labelNames = new ArrayList<String>();
+ rule.labelValues = new ArrayList<String>();
+ for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) labels
+ .entrySet()) {
+ rule.labelNames.add(entry.getKey());
+ rule.labelValues.add((String) entry.getValue());
+ }
+ }
+
+ // Validation.
+ if ((rule.labelNames != null || rule.help != null) && rule.name == null) {
+ throw new IllegalArgumentException("Must provide name, if help or labels are given: "
+ + ruleObject);
+ }
+ if (rule.name != null && rule.pattern == null) {
+ throw new IllegalArgumentException("Must provide pattern, if name is given: "
+ + ruleObject);
+ }
+ }
+ } else {
+ // Default to a single default rule.
+ rules.add(new Rule());
+ }
}
@Override
@@ -82,6 +171,9 @@ public class PrometheusSink extends AbstractWebSink {
final StringBuilder sb = new StringBuilder();
metrics.forEach((String source, Map<String, Double> sourceMetrics) -> {
+ // Map the labels.
+ final Map<String, String> labelKV = new TreeMap<String, String>();
+
String[] sources = source.split("/");
String topology = sources[0];
String component = sources[1];
@@ -96,6 +188,18 @@ public class PrometheusSink extends AbstractWebSink {
final String clusterRoleEnv = hasClusterRoleEnvironment(c, r, e)
? String.format("%s/%s/%s", c, r, e) : null;
+ labelKV.put("topology", topology);
+ labelKV.put("component", component);
+ labelKV.put("instance_id", instance);
+
+ if (clusterRoleEnv != null) {
+ labelKV.put("cluster_role_env", clusterRoleEnv);
+ }
+
+ if (componentType != null) {
+ labelKV.put("component_type", componentType);
+ }
+
sourceMetrics.forEach((String metric, Double value) -> {
// some stream manager metrics in heron contain a instance id as part of the metric name
@@ -104,46 +208,79 @@ public class PrometheusSink extends AbstractWebSink {
// __time_spent_back_pressure_by_compid/container_1_exclaim1_1
// TODO convert to small classes for less string manipulation
final String metricName;
- final String metricInstanceId;
if (componentIsStreamManger) {
final boolean metricHasInstanceId = metric.contains("_by_");
final String[] metricParts = metric.split("/");
if (metricHasInstanceId && metricParts.length == 3) {
- metricName = String.format("%s_%s", metricParts[0], metricParts[2]);
- metricInstanceId = metricParts[1];
+ metricName = splitTargetInstance(metricParts[0], metricParts[2], labelKV);
+ labelKV.put("metric_instance_id", metricParts[1]);
} else if (metricHasInstanceId && metricParts.length == 2) {
- metricName = metricParts[0];
- metricInstanceId = metricParts[1];
+ metricName = splitTargetInstance(metricParts[0], null, labelKV);
+ labelKV.put("metric_instance_id", metricParts[1]);
+ } else if (metricParts.length == 2) {
+ metricName = splitTargetInstance(metricParts[0], metricParts[1], labelKV);
} else {
- metricName = metric;
- metricInstanceId = null;
+ metricName = splitTargetInstance(metric, null, labelKV);
}
-
} else {
- metricName = metric;
- metricInstanceId = null;
- }
-
- String exportedMetricName = String.format("%s_%s", HERON_PREFIX,
- metricName.replace("__", "").toLowerCase());
- sb.append(Prometheus.sanitizeMetricName(exportedMetricName))
- .append("{")
- .append("topology=\"").append(topology).append("\",")
- .append("component=\"").append(component).append("\",")
- .append("instance_id=\"").append(instance).append("\"");
-
- if (clusterRoleEnv != null) {
- sb.append(",cluster_role_env=\"").append(clusterRoleEnv).append("\"");
- }
-
- if (componentType != null) {
- sb.append(",component_type=\"").append(componentType).append("\"");
- }
-
- if (metricInstanceId != null) {
- sb.append(",metric_instance_id=\"").append(metricInstanceId).append("\"");
+ final AtomicReference<String> name = new AtomicReference<>(sanitizeMetricName(metric));
+ rules.forEach(rule -> {
+ String ruleName = name.get();
+ Matcher matcher = null;
+ if (rule.pattern != null) {
+ matcher = rule.pattern.matcher(metric);
+ if (!matcher.matches()) {
+ return;
+ }
+ }
+
+ // If there's no name provided, use default export format.
+ if (rule.name == null || rule.name.isEmpty()) {
+ // nothing
+ } else {
+ // Matcher is set below here due to validation in the constructor.
+ ruleName = sanitizeMetricName(matcher.replaceAll(rule.name));
+ if (ruleName.isEmpty()) {
+ return;
+ }
+ }
+ if (rule.attrNameSnakeCase) {
+ name.set(toSnakeAndLowerCase(ruleName));
+ } else {
+ name.set(ruleName.toLowerCase());
+ }
+ if (rule.labelNames != null) {
+ for (int i = 0; i < rule.labelNames.size(); i++) {
+ final String unsafeLabelName = rule.labelNames.get(i);
+ final String labelValReplacement = rule.labelValues.get(i);
+ String labelName = sanitizeMetricName(matcher.replaceAll(unsafeLabelName));
+ String labelValue = matcher.replaceAll(labelValReplacement);
+ labelName = labelName.toLowerCase();
+ if (!labelName.isEmpty() && !labelValue.isEmpty()) {
+ labelKV.put(labelName, labelValue);
+ }
+ }
+ }
+ });
+ metricName = name.get();
}
+ // TODO Type, Help
+ String exportedMetricName = format("%s_%s", HERON_PREFIX,
+ metricName
+ .replace("__", "")
+ .toLowerCase());
+ sb.append(sanitizeMetricName(exportedMetricName))
+ .append("{");
+ final AtomicBoolean isFirst = new AtomicBoolean(true);
+ labelKV.forEach((k, v) -> {
+ // Add labels
+ if (!isFirst.get()) {
+ sb.append(',');
+ }
+ sb.append(format("%s=\"%s\"", k, v));
+ isFirst.set(false);
+ });
sb.append("} ")
.append(Prometheus.doubleToGoString(value))
.append(" ").append(currentTimeMillis())
@@ -154,6 +291,45 @@ public class PrometheusSink extends AbstractWebSink {
return sb.toString().getBytes();
}
+ private static final Pattern SPLIT_TARGET = Pattern.compile("__(?<name>\\w+)"
+ + "_(?<target>(?<instance>\\w+)-\\d+)");
+ private static final Pattern DIGIT = Pattern.compile("[0-9]+");
+
+ private String splitTargetInstance(String part1, String part2, Map<String, String> labelKV) {
+ if (part2 != null) {
+ if (DIGIT.matcher(part2).matches()) {
+ labelKV.put("metric_instance_id", part2);
+ return part1;
+ }
+ final Matcher m = SPLIT_TARGET.matcher(part1);
+ if (m.matches()) {
+ labelKV.put("metric_instance_id", m.group("target"));
+ return String.format("%s_%s_%s", m.group("name"), m.group("instance"), part2);
+ }
+ return String.format("%s_%s", part1, part2);
+ }
+ return part1;
+ }
+
+ static String toSnakeAndLowerCase(String attrName) {
+ if (attrName == null || attrName.isEmpty()) {
+ return attrName;
+ }
+ char firstChar = attrName.subSequence(0, 1).charAt(0);
+ boolean prevCharIsUpperCaseOrUnderscore = Character.isUpperCase(firstChar) || firstChar == '_';
+ StringBuilder resultBuilder = new StringBuilder(attrName.length())
+ .append(Character.toLowerCase(firstChar));
+ for (char attrChar : attrName.substring(1).toCharArray()) {
+ boolean charIsUpperCase = Character.isUpperCase(attrChar);
+ if (!prevCharIsUpperCaseOrUnderscore && charIsUpperCase) {
+ resultBuilder.append("_");
+ }
+ resultBuilder.append(Character.toLowerCase(attrChar));
+ prevCharIsUpperCaseOrUnderscore = charIsUpperCase || attrChar == '_';
+ }
+ return resultBuilder.toString();
+ }
+
@Override
public void processRecord(MetricsRecord record) {
final String[] sources = MetricsUtil.splitRecordSource(record);
diff --git a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java
index 318d8ec..b73b1dd 100644
--- a/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java
+++ b/heron/metricsmgr/tests/java/org/apache/heron/metricsmgr/sink/PrometheusSinkTests.java
@@ -28,6 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -81,6 +84,45 @@ public class PrometheusSinkTests {
Mockito.when(context.getTopologyName()).thenReturn("testTopology");
Mockito.when(context.getSinkId()).thenReturn("testId");
+ /*
+ # example: metrics.yaml
+ rules:
+ - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
+ name: kafka_$1_$2_$3_total
+ attrNameSnakeCase: true
+ type: COUNTER
+ labels:
+ "$4": "$5"
+ type: COUNTER
+ */
+ /*
+ example: metrics
+ kafkaOffset/nginx-lfp-beacon/totalSpoutLag
+ kafkaOffset/lads_event_meta_backfill_data/partition_10/spoutLag
+ */
+ List<Map<String, Object>> rules = Lists.newArrayList();
+ defaultConf.put("rules", rules);
+ Map<String, Object> rule1 = Maps.newHashMap();
+ Map<String, Object> labels1 = Maps.newHashMap();
+ rules.add(rule1);
+ rule1.put("pattern", "kafkaOffset/(.+)/(.+)");
+ rule1.put("name", "kafka_offset_$2");
+ rule1.put("type", "COUNTER");
+ rule1.put("attrNameSnakeCase", true);
+ rule1.put("labels", labels1);
+ labels1.put("topic", "$1");
+
+ Map<String, Object> rule2 = Maps.newHashMap();
+ Map<String, Object> labels2 = Maps.newHashMap();
+ rules.add(rule2);
+ rule2.put("pattern", "kafkaOffset/(.+)/partition_(\\d+)/(.+)");
+ rule2.put("name", "kafka_offset_partition_$3");
+ rule2.put("type", "COUNTER");
+ rule2.put("labels", labels2);
+ rule2.put("attrNameSnakeCase", true);
+ labels2.put("topic", "$1");
+ labels2.put("partition", "$2");
+
Iterable<MetricsInfo> infos = Arrays.asList(new MetricsInfo("metric_1", "1.0"),
new MetricsInfo("metric_2", "2.0"));
@@ -133,7 +175,9 @@ public class PrometheusSinkTests {
public void testResponseWhenMetricNamesHaveAnInstanceId() throws IOException {
Iterable<MetricsInfo> infos = Arrays.asList(
new MetricsInfo("__connection_buffer_by_instanceid/container_1_word_5/packets", "1.0"),
- new MetricsInfo("__time_spent_back_pressure_by_compid/container_1_exclaim1_1", "1.0")
+ new MetricsInfo("__time_spent_back_pressure_by_compid/container_1_exclaim1_1", "1.0"),
+ new MetricsInfo("__client_stmgr-92/__ack_tuples_to_stmgrs", "1.0"),
+ new MetricsInfo("__instance_bytes_received/1", "1.0")
);
records = Arrays.asList(
@@ -154,7 +198,59 @@ public class PrometheusSinkTests {
"container_1_word_5", "1.0"),
createMetric(topology, "__stmgr__", "stmgr-1",
"time_spent_back_pressure_by_compid",
- "container_1_exclaim1_1", "1.0")
+ "container_1_exclaim1_1", "1.0"),
+ createMetric(topology, "__stmgr__", "stmgr-1",
+ "client_stmgr_ack_tuples_to_stmgrs", "stmgr-92", "1.0"),
+ createMetric(topology, "__stmgr__", "stmgr-1",
+ "instance_bytes_received", "1", "1.0")
+ );
+
+ final Set<String> generatedLines =
+ new HashSet<>(Arrays.asList(new String(sink.generateResponse()).split("\n")));
+
+ assertEquals(expectedLines.size(), generatedLines.size());
+
+ expectedLines.forEach((String line) -> {
+ assertTrue(generatedLines.contains(line));
+ });
+ }
+
+ @Test
+ public void testApacheStormKafkaMetrics() throws IOException {
+ Iterable<MetricsInfo> infos = Arrays.asList(
+ new MetricsInfo("kafkaOffset/event_data/partition_0/spoutLag", "1.0"),
+ new MetricsInfo("kafkaOffset/event_data/partition_10/spoutLag", "1.0"),
+ new MetricsInfo("kafkaOffset/event_data/partition_0/earliestTimeOffset", "1.0"),
+ new MetricsInfo("kafkaOffset/event_data/totalRecordsInPartitions", "1.0"),
+ new MetricsInfo("kafkaOffset/event_data/totalSpoutLag", "1.0"),
+ new MetricsInfo("kafkaOffset/event_data/partition_2/spoutLag", "1.0")
+ );
+
+ records = Arrays.asList(
+ newRecord("shared-aurora-036:31/spout-release-1/container_1_spout-release-1_31",
+ infos, Collections.emptyList())
+ );
+ PrometheusTestSink sink = new PrometheusTestSink();
+ sink.init(defaultConf, context);
+ for (MetricsRecord r : records) {
+ sink.processRecord(r);
+ }
+
+ final String topology = "testTopology";
+
+ final List<String> expectedLines = Arrays.asList(
+ createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+ "kafka_offset_partition_spout_lag", "event_data", "0", "1.0"),
+ createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+ "kafka_offset_partition_spout_lag", "event_data", "10", "1.0"),
+ createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+ "kafka_offset_partition_earliest_time_offset", "event_data", "0", "1.0"),
+ createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+ "kafka_offset_total_records_in_partitions", "event_data", null, "1.0"),
+ createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+ "kafka_offset_total_spout_lag", "event_data", null, "1.0"),
+ createOffsetMetric(topology, "spout-release-1", "container_1_spout-release-1_31",
+ "kafka_offset_partition_spout_lag", "event_data", "2", "1.0")
);
final Set<String> generatedLines =
@@ -195,12 +291,28 @@ public class PrometheusSinkTests {
if (metricNameInstanceId != null) {
return String.format("heron_%s"
- + "{topology=\"%s\",component=\"%s\",instance_id=\"%s\",metric_instance_id=\"%s\"}"
+ + "{component=\"%s\",instance_id=\"%s\",metric_instance_id=\"%s\",topology=\"%s\"}"
+ " %s %d",
- metric, topology, component, instance, metricNameInstanceId, value, NOW);
+ metric, component, instance, metricNameInstanceId, topology, value, NOW);
} else {
- return String.format("heron_%s{topology=\"%s\",component=\"%s\",instance_id=\"%s\"} %s %d",
- metric, topology, component, instance, value, NOW);
+ return String.format("heron_%s{component=\"%s\",instance_id=\"%s\",topology=\"%s\"} %s %d",
+ metric, component, instance, topology, value, NOW);
+ }
+ }
+
+ private String createOffsetMetric(String topology, String component, String instance,
+ String metric, String topic, String partition, String value) {
+
+ if (partition != null) {
+ return String.format("heron_%s"
+ + "{component=\"%s\",instance_id=\"%s\",partition=\"%s\","
+ + "topic=\"%s\",topology=\"%s\"}"
+ + " %s %d",
+ metric, component, instance, partition, topic, topology, value, NOW);
+ } else {
+ return String.format("heron_%s"
+ + "{component=\"%s\",instance_id=\"%s\",topic=\"%s\",topology=\"%s\"} %s %d",
+ metric, component, instance, topic, topology, value, NOW);
}
}