You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 10:06:40 UTC

[07/16] flink git commit: [FLINK-6221] Add PrometheusReporter

[FLINK-6221] Add PrometheusReporter

This closes #3833.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb9505dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb9505dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb9505dd

Branch: refs/heads/master
Commit: bb9505ddee6fbf42ef24853dab8475cff9c7948d
Parents: cd45825
Author: Maximilian Bode <ma...@tngtech.com>
Authored: Sat May 6 02:49:42 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sat Jul 1 10:02:08 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  29 ++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/opt.xml          |   7 +
 flink-metrics/flink-metrics-prometheus/pom.xml  | 129 +++++++++
 .../metrics/prometheus/PrometheusReporter.java  | 269 +++++++++++++++++++
 .../prometheus/PrometheusReporterTest.java      | 190 +++++++++++++
 .../src/test/resources/log4j-test.properties    |  27 ++
 flink-metrics/pom.xml                           |   1 +
 pom.xml                                         |   1 +
 9 files changed, 660 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index fc7b595..06ed9ef 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -415,6 +415,35 @@ metrics.reporter.grph.protocol: TCP
 
 {% endhighlight %}
 
+### Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
+
+In order to use this reporter you must copy `/opt/flink-metrics-prometheus-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Parameters:
+
+- `port` - (optional) the port the Prometheus exporter listens on, defaults to [9249](https://github.com/prometheus/prometheus/wiki/Default-port-allocations).
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: prom
+metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
+
+{% endhighlight %}
+
+Flink metric types are mapped to Prometheus metric types as follows: 
+
+| Flink     | Prometheus | Note                                     |
+| --------- |------------|------------------------------------------|
+| Counter   | Gauge      |Prometheus counters cannot be decremented.|
+| Gauge     | Gauge      |                                          |
+| Histogram | Summary    |Quantiles .5, .75, .95, .98, .99 and .999 |
+| Meter     | Gauge      |The gauge exports the meter's rate.       |
+
+All Flink metrics variables, such as `<host>`, `<job_name>`, `<tm_id>`, `<subtask_index>`, `<task_name>` and `<operator_name>`, are exported to Prometheus as labels. 
+
 ### StatsD (org.apache.flink.metrics.statsd.StatsDReporter)
 
 In order to use this reporter you must copy `/opt/flink-metrics-statsd-{{site.version}}.jar` into the `/lib` folder

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 4070dbb..06a656d 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -264,6 +264,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-prometheus</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-metrics-statsd</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index bb04d28..0afa1d3 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -98,6 +98,13 @@
 		</file>
 
 		<file>
+			<source>../flink-metrics/flink-metrics-prometheus/target/flink-metrics-prometheus-${project.version}-shaded.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-metrics-prometheus-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
+		<file>
 			<source>../flink-metrics/flink-metrics-statsd/target/flink-metrics-statsd-${project.version}.jar</source>
 			<outputDirectory>opt/</outputDirectory>
 			<destName>flink-metrics-statsd-${project.version}.jar</destName>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/pom.xml b/flink-metrics/flink-metrics-prometheus/pom.xml
new file mode 100644
index 0000000..4884433
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/pom.xml
@@ -0,0 +1,129 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-metrics</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-metrics-prometheus</artifactId>
+	<name>flink-metrics-prometheus</name>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-annotations</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-metrics-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>simpleclient</artifactId>
+			<version>${prometheus.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>io.prometheus</groupId>
+			<artifactId>simpleclient_servlet</artifactId>
+			<version>${prometheus.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.nanohttpd</groupId>
+			<artifactId>nanohttpd</artifactId>
+			<version>2.2.0</version>
+		</dependency>
+
+		<!-- test dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>com.mashape.unirest</groupId>
+			<artifactId>unirest-java</artifactId>
+			<version>1.4.9</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<shadedArtifactAttached>true</shadedArtifactAttached>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>io.prometheus.client</pattern>
+									<shadedPattern>org.apache.flink.shaded.io.prometheus.client</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>fi.iki.elonen</pattern>
+									<shadedPattern>org.apache.flink.shaded.fi.iki.elonen</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
new file mode 100644
index 0000000..d23be8c
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/main/java/org/apache/flink/metrics/prometheus/PrometheusReporter.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+
+import fi.iki.elonen.NanoHTTPD;
+import io.prometheus.client.Collector;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.common.TextFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus.
+ */
+@PublicEvolving
+public class PrometheusReporter implements MetricReporter {
+	private static final Logger LOG = LoggerFactory.getLogger(PrometheusReporter.class);
+
+	static final String ARG_PORT = "port";
+	private static final int DEFAULT_PORT = 9249;
+
+	private static final Pattern UNALLOWED_CHAR_PATTERN = Pattern.compile("[^a-zA-Z0-9:_]");
+	private static final CharacterFilter CHARACTER_FILTER = new CharacterFilter() {
+		@Override
+		public String filterCharacters(String input) {
+			return replaceInvalidChars(input);
+		}
+	};
+
+	private static final char SCOPE_SEPARATOR = '_';
+	private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;
+
+	private PrometheusEndpoint prometheusEndpoint;
+	private final Map<String, Collector> collectorsByMetricName = new HashMap<>();
+
+	@VisibleForTesting
+	static String replaceInvalidChars(final String input) {
+		// https://prometheus.io/docs/instrumenting/writing_exporters/
+		// Only [a-zA-Z0-9:_] are valid in metric names, any other characters should be sanitized to an underscore.
+		return UNALLOWED_CHAR_PATTERN.matcher(input).replaceAll("_");
+	}
+
+	@Override
+	public void open(MetricConfig config) {
+		int port = config.getInteger(ARG_PORT, DEFAULT_PORT);
+		LOG.info("Using port {}.", port);
+		prometheusEndpoint = new PrometheusEndpoint(port);
+		try {
+			prometheusEndpoint.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
+		} catch (IOException e) {
+			final String msg = "Could not start PrometheusEndpoint on port " + port;
+			LOG.warn(msg, e);
+			throw new RuntimeException(msg, e);
+		}
+	}
+
+	@Override
+	public void close() {
+		prometheusEndpoint.stop();
+		CollectorRegistry.defaultRegistry.clear();
+	}
+
+	@Override
+	public void notifyOfAddedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+		final String scope = SCOPE_PREFIX + getLogicalScope(group);
+
+		List<String> dimensionKeys = new LinkedList<>();
+		List<String> dimensionValues = new LinkedList<>();
+		for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
+			final String key = dimension.getKey();
+			dimensionKeys.add(CHARACTER_FILTER.filterCharacters(key.substring(1, key.length() - 1)));
+			dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
+		}
+
+		final String validMetricName = scope + SCOPE_SEPARATOR + CHARACTER_FILTER.filterCharacters(metricName);
+		final String metricIdentifier = group.getMetricIdentifier(metricName);
+		final Collector collector;
+		if (metric instanceof Gauge) {
+			collector = createGauge((Gauge) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else if (metric instanceof Counter) {
+			collector = createGauge((Counter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else if (metric instanceof Meter) {
+			collector = createGauge((Meter) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else if (metric instanceof Histogram) {
+			collector = createSummary((Histogram) metric, validMetricName, metricIdentifier, dimensionKeys, dimensionValues);
+		} else {
+			LOG.warn("Cannot add unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
+				metric.getClass().getName());
+			return;
+		}
+		collector.register();
+		collectorsByMetricName.put(metricName, collector);
+	}
+
+	@Override
+	public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {
+		CollectorRegistry.defaultRegistry.unregister(collectorsByMetricName.get(metricName));
+		collectorsByMetricName.remove(metricName);
+	}
+
+	@SuppressWarnings("unchecked")
+	private static String getLogicalScope(MetricGroup group) {
+		return ((FrontMetricGroup<AbstractMetricGroup<?>>) group).getLogicalScope(CHARACTER_FILTER, SCOPE_SEPARATOR);
+	}
+
+	private Collector createGauge(final Gauge gauge, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				final Object value = gauge.getValue();
+				if (value instanceof Double) {
+					return (double) value;
+				}
+				if (value instanceof Number) {
+					return ((Number) value).doubleValue();
+				} else if (value instanceof Boolean) {
+					return ((Boolean) value) ? 1 : 0;
+				} else {
+					LOG.debug("Invalid type for Gauge {}: {}, only number types and booleans are supported by this reporter.",
+						gauge, value.getClass().getName());
+					return 0;
+				}
+			}
+		});
+	}
+
+	private static Collector createGauge(final Counter counter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				return (double) counter.getCount();
+			}
+		});
+	}
+
+	private Collector createGauge(final Meter meter, final String name, final String identifier, final List<String> labelNames, final List<String> labelValues) {
+		return newGauge(name, identifier, labelNames, labelValues, new io.prometheus.client.Gauge.Child() {
+			@Override
+			public double get() {
+				return meter.getRate();
+			}
+		});
+	}
+
+	private static Collector newGauge(String name, String identifier, List<String> labelNames, List<String> labelValues, io.prometheus.client.Gauge.Child child) {
+		return io.prometheus.client.Gauge
+			.build()
+			.name(name)
+			.help(identifier)
+			.labelNames(toArray(labelNames))
+			.create()
+			.setChild(child, toArray(labelValues));
+	}
+
+	private static HistogramSummaryProxy createSummary(final Histogram histogram, final String name, final String identifier, final List<String> dimensionKeys, final List<String> dimensionValues) {
+		return new HistogramSummaryProxy(histogram, name, identifier, dimensionKeys, dimensionValues);
+	}
+
+	static class PrometheusEndpoint extends NanoHTTPD {
+		static final String MIME_TYPE = "plain/text";
+
+		PrometheusEndpoint(int port) {
+			super(port);
+		}
+
+		@Override
+		public Response serve(IHTTPSession session) {
+			if (session.getUri().equals("/metrics")) {
+				StringWriter writer = new StringWriter();
+				try {
+					TextFormat.write004(writer, CollectorRegistry.defaultRegistry.metricFamilySamples());
+				} catch (IOException e) {
+					return newFixedLengthResponse(Response.Status.INTERNAL_ERROR, MIME_TYPE, "Unable to output metrics");
+				}
+				return newFixedLengthResponse(Response.Status.OK, TextFormat.CONTENT_TYPE_004, writer.toString());
+			} else {
+				return newFixedLengthResponse(Response.Status.NOT_FOUND, MIME_TYPE, "Not found");
+			}
+		}
+	}
+
+	private static class HistogramSummaryProxy extends Collector {
+		private static final List<Double> QUANTILES = Arrays.asList(.5, .75, .95, .98, .99, .999);
+
+		private final Histogram histogram;
+		private final String metricName;
+		private final String metricIdentifier;
+		private final List<String> labelNamesWithQuantile;
+		private final List<String> labelValues;
+
+		HistogramSummaryProxy(final Histogram histogram, final String metricName, final String metricIdentifier, final List<String> labelNames, final List<String> labelValues) {
+			this.histogram = histogram;
+			this.metricName = metricName;
+			this.metricIdentifier = metricIdentifier;
+			this.labelNamesWithQuantile = addToList(labelNames, "quantile");
+			this.labelValues = labelValues;
+		}
+
+		@Override
+		public List<MetricFamilySamples> collect() {
+			// We cannot use SummaryMetricFamily because it is impossible to get a sum of all values (at least for Dropwizard histograms,
+			// whose snapshot's values array only holds a sample of recent values).
+
+			final HistogramStatistics statistics = histogram.getStatistics();
+
+			List<MetricFamilySamples.Sample> samples = new LinkedList<>();
+			samples.add(new MetricFamilySamples.Sample(metricName + "_count",
+				labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
+			for (final Double quantile : QUANTILES) {
+				samples.add(new MetricFamilySamples.Sample(metricName, labelNamesWithQuantile,
+					addToList(labelValues, quantile.toString()),
+					statistics.getQuantile(quantile)));
+			}
+			return Collections.singletonList(new MetricFamilySamples(metricName, Type.SUMMARY, metricIdentifier, samples));
+		}
+	}
+
+	private static List<String> addToList(List<String> list, String element) {
+		final List<String> result = new ArrayList<>(list);
+		result.add(element);
+		return result;
+	}
+
+	private static String[] toArray(List<String> labelNames) {
+		return labelNames.toArray(new String[labelNames.size()]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
new file mode 100644
index 0000000..83b7b41
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.prometheus;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+
+import com.mashape.unirest.http.HttpResponse;
+import com.mashape.unirest.http.Unirest;
+import com.mashape.unirest.http.exceptions.UnirestException;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.Arrays;
+
+import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
+import static org.apache.flink.runtime.metrics.scope.ScopeFormat.SCOPE_SEPARATOR;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Test for {@link PrometheusReporter}.
+ */
+public class PrometheusReporterTest extends TestLogger {
+	private static final int NON_DEFAULT_PORT = 9429;
+
+	private static final String HOST_NAME = "hostname";
+	private static final String TASK_MANAGER = "tm";
+
+	private static final String HELP_PREFIX = "# HELP ";
+	private static final String TYPE_PREFIX = "# TYPE ";
+	private static final String DIMENSIONS = "host=\"" + HOST_NAME + "\",tm_id=\"" + TASK_MANAGER + "\"";
+	private static final String DEFAULT_LABELS = "{" + DIMENSIONS + ",}";
+	private static final String SCOPE_PREFIX = "flink_taskmanager_";
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter()));
+	private final MetricReporter reporter = registry.getReporters().get(0);
+
+	@Test
+	public void counterIsReportedAsPrometheusGauge() throws UnirestException {
+		//Prometheus counters may not decrease
+		Counter testCounter = new SimpleCounter();
+		testCounter.inc(7);
+
+		String counterName = "testCounter";
+		String gaugeName = SCOPE_PREFIX + counterName;
+
+		assertThat(addMetricAndPollResponse(testCounter, counterName),
+			equalTo(HELP_PREFIX + gaugeName + " " + getFullMetricName(counterName) + "\n" +
+				TYPE_PREFIX + gaugeName + " gauge" + "\n" +
+				gaugeName + DEFAULT_LABELS + " 7.0" + "\n"));
+	}
+
+	@Test
+	public void gaugeIsReportedAsPrometheusGauge() throws UnirestException {
+		Gauge<Integer> testGauge = new Gauge<Integer>() {
+			@Override
+			public Integer getValue() {
+				return 1;
+			}
+		};
+
+		String gaugeName = "testGauge";
+		String prometheusGaugeName = SCOPE_PREFIX + gaugeName;
+
+		assertThat(addMetricAndPollResponse(testGauge, gaugeName),
+			equalTo(HELP_PREFIX + prometheusGaugeName + " " + getFullMetricName(gaugeName) + "\n" +
+				TYPE_PREFIX + prometheusGaugeName + " gauge" + "\n" +
+				prometheusGaugeName + DEFAULT_LABELS + " 1.0" + "\n"));
+	}
+
+	@Test
+	public void histogramIsReportedAsPrometheusSummary() throws UnirestException {
+		Histogram testHistogram = new TestingHistogram();
+
+		String histogramName = "testHistogram";
+		String summaryName = SCOPE_PREFIX + histogramName;
+
+		String response = addMetricAndPollResponse(testHistogram, histogramName);
+		assertThat(response, containsString(HELP_PREFIX + summaryName + " " + getFullMetricName(histogramName) + "\n" +
+			TYPE_PREFIX + summaryName + " summary" + "\n" +
+			summaryName + "_count" + DEFAULT_LABELS + " 1.0" + "\n"));
+		for (String quantile : Arrays.asList("0.5", "0.75", "0.95", "0.98", "0.99", "0.999")) {
+			assertThat(response, containsString(
+				summaryName + "{" + DIMENSIONS + ",quantile=\"" + quantile + "\",} " + quantile + "\n"));
+		}
+	}
+
+	@Test
+	public void meterRateIsReportedAsPrometheusGauge() throws UnirestException {
+		Meter testMeter = new TestMeter();
+
+		String meterName = "testMeter";
+		String counterName = SCOPE_PREFIX + meterName;
+
+		assertThat(addMetricAndPollResponse(testMeter, meterName),
+			equalTo(HELP_PREFIX + counterName + " " + getFullMetricName(meterName) + "\n" +
+				TYPE_PREFIX + counterName + " gauge" + "\n" +
+				counterName + DEFAULT_LABELS + " 5.0" + "\n"));
+	}
+
+	@Test
+	public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException {
+		reporter.close();
+		thrown.expect(UnirestException.class);
+		pollMetrics();
+	}
+
+	@Test
+	public void invalidCharactersAreReplacedWithUnderscore() {
+		assertThat(PrometheusReporter.replaceInvalidChars(""), equalTo(""));
+		assertThat(PrometheusReporter.replaceInvalidChars("abc"), equalTo("abc"));
+		assertThat(PrometheusReporter.replaceInvalidChars("abc\""), equalTo("abc_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"abc"), equalTo("_abc"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"abc\""), equalTo("_abc_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"a\"b\"c\""), equalTo("_a_b_c_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"\"\"\""), equalTo("____"));
+		assertThat(PrometheusReporter.replaceInvalidChars("    "), equalTo("____"));
+		assertThat(PrometheusReporter.replaceInvalidChars("\"ab ;(c)'"), equalTo("_ab___c__"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a b c"), equalTo("a_b_c"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a b c "), equalTo("a_b_c_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a;b'c*"), equalTo("a_b_c_"));
+		assertThat(PrometheusReporter.replaceInvalidChars("a,=;:?'b,=;:?'c"), equalTo("a___:__b___:__c"));
+	}
+
+	private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
+		reporter.notifyOfAddedMetric(metric, metricName, new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)));
+		return pollMetrics().getBody();
+	}
+
+	private static HttpResponse<String> pollMetrics() throws UnirestException {
+		return Unirest.get("http://localhost:" + NON_DEFAULT_PORT + "/metrics").asString();
+	}
+
+	private static String getFullMetricName(String metricName) {
+		return HOST_NAME + SCOPE_SEPARATOR + "taskmanager" + SCOPE_SEPARATOR + TASK_MANAGER + SCOPE_SEPARATOR + metricName;
+	}
+
+	private static Configuration createConfigWithOneReporter() {
+		Configuration cfg = new Configuration();
+		cfg.setString(MetricOptions.REPORTERS_LIST, "test1");
+		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." +
+			ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName());
+		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ARG_PORT, "" + NON_DEFAULT_PORT);
+		return cfg;
+	}
+
+	@After
+	public void closeReporterAndShutdownRegistry() {
+		reporter.close();
+		registry.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties b/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2226f68
--- /dev/null
+++ b/flink-metrics/flink-metrics-prometheus/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+log4j.rootLogger=OFF, testlogger
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 3be21aa..3b655c4 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -39,6 +39,7 @@ under the License.
 		<module>flink-metrics-ganglia</module>
 		<module>flink-metrics-graphite</module>
 		<module>flink-metrics-jmx</module>
+		<module>flink-metrics-prometheus</module>
 		<module>flink-metrics-statsd</module>
 		<module>flink-metrics-datadog</module>
 	</modules>

http://git-wip-us.apache.org/repos/asf/flink/blob/bb9505dd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90de6a1..d73f681 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,6 +108,7 @@ under the License.
 		<curator.version>2.12.0</curator.version>
 		<jackson.version>2.7.4</jackson.version>
 		<metrics.version>3.1.0</metrics.version>
+		<prometheus.version>0.0.21</prometheus.version>
 		<junit.version>4.12</junit.version>
 		<mockito.version>1.10.19</mockito.version>
 		<powermock.version>1.6.5</powermock.version>