You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 10:06:40 UTC
[07/16] flink git commit: [FLINK-6221] Add PrometheusReporter
[FLINK-6221] Add PrometheusReporter
This closes #3833.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb9505dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb9505dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb9505dd
Branch: refs/heads/master
Commit: bb9505ddee6fbf42ef24853dab8475cff9c7948d
Parents: cd45825
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Sat May 6 02:49:42 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:08 2017 +0200
----------------------------------------------------------------------
docs/monitoring/metrics.md | 29 ++
flink-dist/pom.xml | 7 +
flink-dist/src/main/assemblies/opt.xml | 7 +
flink-metrics/flink-metrics-prometheus/pom.xml | 129 +++++++++
.../metrics/prometheus/PrometheusReporter.java | 269 +++++++++++++++++++
.../prometheus/PrometheusReporterTest.java | 190 +++++++++++++
.../src/test/resources/log4j-test.properties | 27 ++
flink-metrics/pom.xml | 1 +
pom.xml | 1 +
9 files changed, 660 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index fc7b595..06ed9ef 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -415,6 +415,35 @@ metrics.reporter.grph.protocol: TCP
{% endhighlight %}
+### Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `port` - (optional) the port the Prometheus exporter listens on, defaults to [9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations).
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: prom
+metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
+
+{% endhighlight %}
+
+Flink metric types are mapped to Prometheus metric types as follows:
+
+| Flink | Prometheus | Note |
+| --------- |------------|------------------------------------------|
+| Counter | Gauge |Prometheus counters cannot be decremented.|
+| Gauge | Gauge | |
+| Histogram | Summary |Quantiles .5, .75, .95, .98, .99 and .999 |
+| Meter | Gauge |The gauge exports the meter's rate. |
+
+All Flink metrics variables, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>` and `<operator_name>`, are exported to Prometheus as labels.
+
### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 4070dbb..06a656d 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -264,6 +264,13 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-prometheus</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-statsd</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index bb04d28..0afa1d3 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -98,6 +98,13 @@
</file>
<file>
+ <source>../flink-metrics/flink-metrics-prometheus/target/flink-metrics-prometheus-${project.version}-shaded.jar</source>
+ <outputDirectory>opt/</outputDirectory>
+ <destName>flink-metrics-prometheus-${project.version}.jar</destName>
+ <fileMode>0644</fileMode>
+ </file>
+
+ <file>
<source>../flink-metrics/flink-metrics-statsd/target/flink-metrics-statsd-${project.version}.jar</source>
<outputDirectory>opt/</outputDirectory>
<destName>flink-metrics-statsd-${project.version}.jar</destName>
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml
new file mode 100644
index 0000000..4884433
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-metrics-prometheus</artifactId>
+ <name>flink-metrics-prometheus</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-annotations</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.prometheus</groupId>
+ <artifactId>simpleclient_servlet</artifactId>
+ <version>${prometheus.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.nanohttpd</groupId>
+ <artifactId>nanohttpd</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.mashape.unirest</groupId>
+ <artifactId>unirest-java</artifactId>
+ <version>1.4.9</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <relocations combine.children="append">
+ <relocation>
+ <pattern>io.prometheus.client</pattern>
+ <shadedPattern>org.apache.flink.shaded.io.prometheus.client</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>fi.iki.elonen</pattern>
+ <shadedPattern>org.apache.flink.shaded.fi.iki.elonen</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
new file mode 100644
index 0000000..d23be8c
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
+ */
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+ private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
+
+ static final String ARG_PORT = "port";
+ private static final int DEFAULT_PORT = 9249;
+
+ private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
+ private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
+ @Override
+ public String filterCharacters(String input) {
+ return replaceInvalidChars(input);
+ }
+ };
+
+ private static final char SCOPE_SEPARATOR = '_';
+ private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
+
+ private PrometheusEndpoint prometheusEndpoint;
+ private final Map<String, Collector> collectorsByMetricName = new HashMap<>();
+
+ @VisibleForTesting
+ static String replaceInvalidChars(final String input) {
+ // https://prometheus.io/docs/instrumenting/writing_exporters/
+ // Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
+ return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+ }
+
+ @Override
+ public void open(MetricConfig config) {
+ int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+ LOG.info("Using port {}.", port);
+ prometheusEndpoint = new PrometheusEndpoint(port);
+ try {
+ prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
+ } catch (IOException e) {
+ final String msg = "Could not start PrometheusEndpoint on port " + port;
+ LOG.warn(msg, e);
+ throw new RuntimeException(msg, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ prometheusEndpoint.stop();
+ CollectorRegistry.defaultRegistry.clear();
+ }
+
+ @Override
+ public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+ final String scope = SCOPE_PREFIX + getLogicalScope(group);
+
+ List<String> dimensionKeys = new LinkedList<>();
+ List<String> dimensionValues = new LinkedList<>();
+ for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
+ final String key = dimension.getKey();
+ dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
+ dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+ }
+
+ final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
+ final String metricIdentifier = group.getMetricIdentifier(metricName);
+ final Collector collector;
+ if (metric instanceof Gauge) {
+ collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else if (metric instanceof Counter) {
+ collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else if (metric instanceof Meter) {
+ collector = createGauge((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else if (metric instanceof Histogram) {
+ collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+ } else {
+ LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+ metric.getClass().getName());
+ return;
+ }
+ collector.register();
+ collectorsByMetricName.put(metricName, collector);
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+ CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
+ collectorsByMetricName.remove(metricName);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String getLogicalScope(MetricGroup group) {
+ return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+ }
+
+ private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+ return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+ @Override
+ public double get() {
+ final Object value = gauge.getValue();
+ if (value instanceof Double) {
+ return (double) value;
+ }
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ } else if (value instanceof Boolean) {
+ return ((Boolean) value) ? 1 : 0;
+ } else {
+ LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
+ gauge, value.getClass().getName());
+ return 0;
+ }
+ }
+ });
+ }
+
+ private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+ return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+ @Override
+ public double get() {
+ return (double) counter.getCount();
+ }
+ });
+ }
+
+ private Collector createGauge(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+ return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+ @Override
+ public double get() {
+ return meter.getRate();
+ }
+ });
+ }
+
+ private static Collector newGauge(String name, String identifier, List<String> labelNames, List<String> labelValues, io.prometheus.client.Gauge.Child child) {
+ return io.prometheus.client.Gauge
+ .build()
+ .name(name)
+ .help(identifier)
+ .labelNames(toArray(labelNames))
+ .create()
+ .setChild(child, toArray(labelValues));
+ }
+
+ private static HistogramSummaryProxy createSummary(final Histogram histogram, final String name, final String identifier, final List<String> dimensionKeys, final List<String> dimensionValues) {
+ return new HistogramSummaryProxy(histogram, name, identifier, dimensionKeys, dimensionValues);
+ }
+
+ static class PrometheusEndpoint extends NanoHTTPD {
+ static final String MIME_TYPE = "plain/text";
+
+ PrometheusEndpoint(int port) {
+ super(port);
+ }
+
+ @Override
+ public Response serve(IHTTPSession session) {
+ if (session.getUri().equals("/metrics")) {
+ StringWriter writer = new StringWriter();
+ try {
+ TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples());
+ } catch (IOException e) {
+ return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, MIME_TYPE, "Unable to output metrics");
+ }
+ return newFixedLengthResponse(Response.Status.OK, TextFormat.CONTENT_TYPE_004, writer.toString());
+ } else {
+ return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_TYPE, "Not found");
+ }
+ }
+ }
+
+ private static class HistogramSummaryProxy extends Collector {
+ private static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999);
+
+ private final Histogram histogram;
+ private final String metricName;
+ private final String metricIdentifier;
+ private final List<String> labelNamesWithQuantile;
+ private final List<String> labelValues;
+
+ HistogramSummaryProxy(final Histogram histogram, final String metricName, final String metricIdentifier, final List<String> labelNames, final List<String> labelValues) {
+ this.histogram = histogram;
+ this.metricName = metricName;
+ this.metricIdentifier = metricIdentifier;
+ this.labelNamesWithQuantile = addToList(labelNames, "quantile");
+ this.labelValues = labelValues;
+ }
+
+ @Override
+ public List<MetricFamilySamples> collect() {
+ // We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms,
+ // whose snapshot's values array only holds a sample of recent values).
+
+ final HistogramStatistics statistics = histogram.getStatistics();
+
+ List<MetricFamilySamples.Sample> samples = new LinkedList<>();
+ samples.add(new MetricFamilySamples.Sample(metricName + "_count",
+ labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
+ for (final Double quantile : QUANTILES) {
+ samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
+ addToList(labelValues, quantile.toString()),
+ statistics.getQuantile(quantile)));
+ }
+ return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, metricIdentifier, samples));
+ }
+ }
+
+ private static List<String> addToList(List<String> list, String element) {
+ final List<String> result = new ArrayList<>(list);
+ result.add(element);
+ return result;
+ }
+
+ private static String[] toArray(List<String> labelNames) {
+ return labelNames.toArray(new String[labelNames.size()]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
new file mode 100644
index 0000000..83b7b41
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
+import static org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterTest extends TestLogger {
+ private static final int NON_DEFAULT_PORT = 9429;
+
+ private static final String HOST_NAME = "hostname";
+ private static final String TASK_MANAGER = "tm";
+
+ private static final String HELP_PREFIX = "# HELP ";
+ private static final String TYPE_PREFIX = "# TYPE ";
+ private static final String DIMENSIONS = "host=\"" + HOST_NAME + "\",tm_id=\"" + TASK_MANAGER + "\"";
+ private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
+ private static final String SCOPE_PREFIX = "flink_taskmanager_";
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter()));
+ private final MetricReporter reporter = registry.getReporters().get(0);
+
+ @Test
+ public void counterIsReportedAsPrometheusGauge() throws UnirestException {
+ //Prometheus counters may not decrease
+ Counter testCounter = new SimpleCounter();
+ testCounter.inc(7);
+
+ String counterName = "testCounter";
+ String gaugeName = SCOPE_PREFIX + counterName;
+
+ assertThat(addMetricAndPollResponse(testCounter, counterName),
+ equalTo(HELP_PREFIX + gaugeName + " " + getFullMetricName(counterName) + "\n" +
+ TYPE_PREFIX + gaugeName + " gauge" + "\n" +
+ gaugeName + DEFAULT_LABELS + " 7.0" + "\n"));
+ }
+
+ @Test
+ public void gaugeIsReportedAsPrometheusGauge() throws UnirestException {
+ Gauge<Integer> testGauge = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return 1;
+ }
+ };
+
+ String gaugeName = "testGauge";
+ String prometheusGaugeName = SCOPE_PREFIX + gaugeName;
+
+ assertThat(addMetricAndPollResponse(testGauge, gaugeName),
+ equalTo(HELP_PREFIX + prometheusGaugeName + " " + getFullMetricName(gaugeName) + "\n" +
+ TYPE_PREFIX + prometheusGaugeName + " gauge" + "\n" +
+ prometheusGaugeName + DEFAULT_LABELS + " 1.0" + "\n"));
+ }
+
+ @Test
+ public void histogramIsReportedAsPrometheusSummary() throws UnirestException {
+ Histogram testHistogram = new TestingHistogram();
+
+ String histogramName = "testHistogram";
+ String summaryName = SCOPE_PREFIX + histogramName;
+
+ String response = addMetricAndPollResponse(testHistogram, histogramName);
+ assertThat(response, containsString(HELP_PREFIX + summaryName + " " + getFullMetricName(histogramName) + "\n" +
+ TYPE_PREFIX + summaryName + " summary" + "\n" +
+ summaryName + "_count" + DEFAULT_LABELS + " 1.0" + "\n"));
+ for (String quantile : Arrays.asList("0.5", "0.75", "0.95", "0.98", "0.99", "0.999")) {
+ assertThat(response, containsString(
+ summaryName + "{" + DIMENSIONS + ",quantile=\"" + quantile + "\",} " + quantile + "\n"));
+ }
+ }
+
+ @Test
+ public void meterRateIsReportedAsPrometheusGauge() throws UnirestException {
+ Meter testMeter = new TestMeter();
+
+ String meterName = "testMeter";
+ String counterName = SCOPE_PREFIX + meterName;
+
+ assertThat(addMetricAndPollResponse(testMeter, meterName),
+ equalTo(HELP_PREFIX + counterName + " " + getFullMetricName(meterName) + "\n" +
+ TYPE_PREFIX + counterName + " gauge" + "\n" +
+ counterName + DEFAULT_LABELS + " 5.0" + "\n"));
+ }
+
+ @Test
+ public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException {
+ reporter.close();
+ thrown.expect(UnirestException.class);
+ pollMetrics();
+ }
+
+ @Test
+ public void invalidCharactersAreReplacedWithUnderscore() {
+ assertThat(PrometheusReporter.replaceInvalidChars(""), equalTo(""));
+ assertThat(PrometheusReporter.replaceInvalidChars("abc"), equalTo("abc"));
+ assertThat(PrometheusReporter.replaceInvalidChars("abc\""), equalTo("abc_"));
+ assertThat(PrometheusReporter.replaceInvalidChars("\"abc"), equalTo("_abc"));
+ assertThat(PrometheusReporter.replaceInvalidChars("\"abc\""), equalTo("_abc_"));
+ assertThat(PrometheusReporter.replaceInvalidChars("\"a\"b\"c\""), equalTo("_a_b_c_"));
+ assertThat(PrometheusReporter.replaceInvalidChars("\"\"\"\""), equalTo("____"));
+ assertThat(PrometheusReporter.replaceInvalidChars(" "), equalTo("____"));
+ assertThat(PrometheusReporter.replaceInvalidChars("\"ab ;(c)'"), equalTo("_ab___c__"));
+ assertThat(PrometheusReporter.replaceInvalidChars("a b c"), equalTo("a_b_c"));
+ assertThat(PrometheusReporter.replaceInvalidChars("a b c "), equalTo("a_b_c_"));
+ assertThat(PrometheusReporter.replaceInvalidChars("a;b'c*"), equalTo("a_b_c_"));
+ assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c"));
+ }
+
+ private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
+ reporter.notifyOfAddedMetric(metric, metricName, new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)));
+ return pollMetrics().getBody();
+ }
+
+ private static HttpResponse<String> pollMetrics() throws UnirestException {
+ return Unirest.get("http://localhost:" + NON_DEFAULT_PORT + "/metrics").asString();
+ }
+
+ private static String getFullMetricName(String metricName) {
+ return HOST_NAME + SCOPE_SEPARATOR + "taskmanager" + SCOPE_SEPARATOR + TASK_MANAGER + SCOPE_SEPARATOR + metricName;
+ }
+
+ private static Configuration createConfigWithOneReporter() {
+ Configuration cfg = new Configuration();
+ cfg.setString(MetricOptions.REPORTERS_LIST, "test1");
+ cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." +
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName());
+ cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ARG_PORT, "" + NON_DEFAULT_PORT);
+ return cfg;
+ }
+
+ @After
+ public void closeReporterAndShutdownRegistry() {
+ reporter.close();
+ registry.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 3be21aa..3b655c4 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -39,6 +39,7 @@ under the License.
<module>flink-metrics-ganglia</module>
<module>flink-metrics-graphite</module>
<module>flink-metrics-jmx</module>
+ <module>flink-metrics-prometheus</module>
<module>flink-metrics-statsd</module>
<module>flink-metrics-datadog</module>
</modules>
http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90de6a1..d73f681 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@ under the License.
<curator.version>2.12.0</curator.version>
<jackson.version>2.7.4</jackson.version>
<metrics.version>3.1.0</metrics.version>
+ <prometheus.version>0.0.21</prometheus.version>
<junit.version>4.12</junit.version>
<mockito.version>1.10.19</mockito.version>
<powermock.version>1.6.5</powermock.version>