You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:32 UTC

[05/10] flink git commit: [FLINK-8080][metrics] metrics.reporters now optional include list

[FLINK-8080][metrics] metrics.reporters now optional include list

This closes #5099.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/493c2857
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/493c2857
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/493c2857

Branch: refs/heads/master
Commit: 493c28571f22d9dde4edbec0ba38f2761fb51335
Parents: 42b0114
Author: zentol <ch...@apache.org>
Authored: Wed Nov 15 12:56:30 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:15 2017 +0100

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  9 +---
 .../connectors/kafka/KafkaTestBase.java         |  2 -
 .../flink/configuration/MetricOptions.java      |  8 ++--
 .../ScheduledDropwizardReporterTest.java        |  1 -
 .../DropwizardFlinkHistogramWrapperTest.java    |  2 -
 .../flink/metrics/jmx/JMXReporterTest.java      |  6 ---
 .../jobmanager/JMXJobManagerMetricTest.java     |  1 -
 .../prometheus/PrometheusReporterTest.java      |  2 -
 .../flink/metrics/slf4j/Slf4jReporterTest.java  |  1 -
 .../metrics/statsd/StatsDReporterTest.java      |  3 --
 .../metrics/MetricRegistryConfiguration.java    | 45 +++++++++++++++++---
 .../runtime/metrics/MetricRegistryImpl.java     |  2 +-
 .../runtime/metrics/MetricRegistryImplTest.java | 13 ++----
 .../metrics/groups/AbstractMetricGroupTest.java |  1 -
 .../groups/MetricGroupRegistrationTest.java     |  2 -
 15 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index e4b5161..2a9ab50 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -560,11 +560,11 @@ counter = getRuntimeContext()
 Metrics can be exposed to an external system by configuring one or several reporters in `conf/flink-conf.yaml`. These
 reporters will be instantiated on each job and task manager when they are started.
 
-- `metrics.reporters`: The list of named reporters.
 - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
 - `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
 - `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
 - `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`.
+- `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.
 
 All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
 we will list more settings specific to each reporter.
@@ -606,7 +606,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: jmx
 metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
 metrics.reporter.jmx.port: 8789
 
@@ -642,7 +641,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: gang
 metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
 metrics.reporter.gang.host: localhost
 metrics.reporter.gang.port: 8649
@@ -668,7 +666,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: grph
 metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
 metrics.reporter.grph.host: localhost
 metrics.reporter.grph.port: 2003
@@ -689,7 +686,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: prom
 metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
 
 {% endhighlight %}
@@ -719,7 +715,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: stsd
 metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
 metrics.reporter.stsd.host: localhost
 metrics.reporter.stsd.port: 8125
@@ -743,7 +738,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: dghttp
 metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
 metrics.reporter.dghttp.apikey: xxx
 metrics.reporter.dghttp.tags: myflinkapp,prod
@@ -760,7 +754,6 @@ Example configuration:
 
 {% highlight yaml %}
 
-metrics.reporters: slf4j
 metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
 metrics.reporter.slf4j.interval: 60 SECONDS
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 74485b4..f471cd4 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -21,7 +21,6 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -136,7 +135,6 @@ public abstract class KafkaTestBase extends TestLogger {
 		flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
 		flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 16L);
 		flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
-		flinkConfig.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
 		flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		return flinkConfig;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 8a328ac..42eb575 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -25,12 +25,12 @@ import static org.apache.flink.configuration.ConfigOptions.key;
 public class MetricOptions {
 
 	/**
-	 * The list of named reporters. Names are defined here and per-reporter configs
-	 * are given with the reporter config prefix and the reporter name.
+	 * An optional list of reporter names. If configured, only reporters whose name matches any of the names in the list
+	 * will be started. Otherwise, all reporters that could be found in the configuration will be started.
 	 *
-	 * Example:
+	 * <p>Example:
 	 * <pre>{@code
-	 * metrics.reporters = foo, bar
+	 * metrics.reporters = foo,bar
 	 *
 	 * metrics.reporter.foo.class = org.apache.flink.metrics.reporter.JMXReporter
 	 * metrics.reporter.foo.interval = 10

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 3fa0474..4a2ca3a 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -82,7 +82,6 @@ public class ScheduledDropwizardReporterTest {
 		String taskManagerId = "tas:kMana::ger";
 		String counterName = "testCounter";
 
-		configuration.setString(MetricOptions.REPORTERS_LIST, "test");
 		configuration.setString(
 				ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
 				"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 8f70abb..fb21a75b 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.dropwizard.metrics;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -101,7 +100,6 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		int size = 10;
 		String histogramMetricName = "histogram";
 		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 067b08f..40b7f15 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestHistogram;
@@ -100,7 +99,6 @@ public class JMXReporterTest extends TestLogger {
 	@Test
 	public void testPortConflictHandling() throws Exception {
 		Configuration cfg = new Configuration();
-		cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
@@ -160,8 +158,6 @@ public class JMXReporterTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
 
-		cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
 
@@ -236,7 +232,6 @@ public class JMXReporterTest extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
 			registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -286,7 +281,6 @@ public class JMXReporterTest extends TestLogger {
 
 		try {
 			Configuration config = new Configuration();
-			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
 			registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 7280476..c74db28 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -64,7 +64,6 @@ public class JMXJobManagerMetricTest {
 		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
 		Configuration flinkConfiguration = new Configuration();
 
-		flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, "test");
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index d905c25..39cf4be 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.metrics.prometheus;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -279,7 +278,6 @@ public class PrometheusReporterTest extends TestLogger {
 
 	static Configuration createConfigWithOneReporter(String reporterName, String portString) {
 		Configuration cfg = new Configuration();
-		cfg.setString(MetricOptions.REPORTERS_LIST, reporterName);
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ARG_PORT, portString);
 		return cfg;

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index f538bc7..b344f45 100644
--- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -63,7 +63,6 @@ public class Slf4jReporterTest extends TestLogger {
 		TestUtils.addTestAppenderForRootLogger();
 
 		Configuration configuration = new Configuration();
-		configuration.setString(MetricOptions.REPORTERS_LIST, "slf4j");
 		configuration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "slf4j." +
 			ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, Slf4jReporter.class.getName());
 		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 275f2e1..08d4998 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -81,7 +81,6 @@ public class StatsDReporterTest extends TestLogger {
 		String taskManagerId = "tas:kMana::ger";
 		String counterName = "testCounter";
 
-		configuration.setString(MetricOptions.REPORTERS_LIST, "test");
 		configuration.setString(
 				ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
 				"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
@@ -151,7 +150,6 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			Configuration config = new Configuration();
-			config.setString(MetricOptions.REPORTERS_LIST, "test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
@@ -225,7 +223,6 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			Configuration config = new Configuration();
-			config.setString(MetricOptions.REPORTERS_LIST, "test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index d07cb65..7188a59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -32,7 +32,11 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /**
  * Configuration object for {@link MetricRegistryImpl}.
@@ -44,7 +48,14 @@ public class MetricRegistryConfiguration {
 	private static volatile MetricRegistryConfiguration defaultConfiguration;
 
 	// regex pattern to split the defined reporters
-	private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
+	private static final Pattern reporterListPattern = Pattern.compile("\\s*,\\s*");
+
+	// regex pattern to extract the name from reporter configuration keys, e.g. "rep" from "metrics.reporter.rep.class"
+	private static final Pattern reporterClassPattern = Pattern.compile(
+		Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
+		// [\S&&[^.]] = intersection of non-whitespace and non-period character classes
+		"([\\S&&[^.]]*)\\." +
+		Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
 
 	// scope formats for the different components
 	private final ScopeFormats scopeFormats;
@@ -108,15 +119,37 @@ public class MetricRegistryConfiguration {
 			delim = '.';
 		}
 
-		final String definedReporters = configuration.getString(MetricOptions.REPORTERS_LIST);
+		String includedReportersString = configuration.getString(MetricOptions.REPORTERS_LIST, "");
+		Set<String> includedReporters = reporterListPattern.splitAsStream(includedReportersString)
+			.collect(Collectors.toSet());
+
+		// use a TreeSet to make the reporter order deterministic, which is useful for testing
+		Set<String> namedReporters = new TreeSet<>(String::compareTo);
+		// scan entire configuration for "metric.reporter" keys and parse individual reporter configurations
+		for (String key : configuration.keySet()) {
+			if (key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+				Matcher matcher = reporterClassPattern.matcher(key);
+				if (matcher.matches()) {
+					String reporterName = matcher.group(1);
+					if (includedReporters.isEmpty() || includedReporters.contains(reporterName)) {
+						if (namedReporters.contains(reporterName)) {
+							LOG.warn("Duplicate class configuration detected for reporter {}.", reporterName);
+						} else {
+							namedReporters.add(reporterName);
+						}
+					} else {
+						LOG.info("Excluding reporter {}, not configured in reporter list ({}).", reporterName, includedReportersString);
+					}
+				}
+			}
+		}
+
 		List<Tuple2<String, Configuration>> reporterConfigurations;
 
-		if (definedReporters == null) {
+		if (namedReporters.isEmpty()) {
 			reporterConfigurations = Collections.emptyList();
 		} else {
-			String[] namedReporters = splitPattern.split(definedReporters);
-
-			reporterConfigurations = new ArrayList<>(namedReporters.length);
+			reporterConfigurations = new ArrayList<>(namedReporters.size());
 
 			for (String namedReporter: namedReporters) {
 				DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index d36d095..c8f4490 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -137,7 +137,7 @@ public class MetricRegistryImpl implements MetricRegistry {
 
 					MetricConfig metricConfig = new MetricConfig();
 					reporterConfig.addAllToProperties(metricConfig);
-					LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
+					LOG.info("Configuring {} with {}.", namedReporter, metricConfig);
 					reporterInstance.open(metricConfig);
 
 					if (reporterInstance instanceof Scheduled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index b0b20b2..2eccc0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -70,20 +70,22 @@ public class MetricRegistryImplTest extends TestLogger {
 	}
 
 	/**
-	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
+	 * Verifies that the reporter name list is correctly used to determine which reporters should be instantiated.
 	 */
 	@Test
-	public void testReporterInstantiation() {
+	public void testReporterInclusion() {
 		Configuration config = new Configuration();
 
 		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
 
 		MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 		assertTrue(metricRegistry.getReporters().size() == 1);
 
 		Assert.assertTrue(TestReporter1.wasOpened);
+		Assert.assertFalse(TestReporter11.wasOpened);
 
 		metricRegistry.shutdown();
 	}
@@ -107,7 +109,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	public void testMultipleReporterInstantiation() {
 		Configuration config = new Configuration();
 
-		config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
@@ -166,7 +167,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	public void testReporterArgumentForwarding() {
 		Configuration config = new Configuration();
 
-		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
@@ -197,7 +197,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	public void testReporterScheduling() throws InterruptedException {
 		Configuration config = new Configuration();
 
-		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -247,7 +246,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	@Test
 	public void testReporterNotifications() {
 		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
 
@@ -356,7 +354,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiterForReporters() {
 		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -379,7 +376,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	@Test
 	public void testConfigurableDelimiterForReportersInGroup() {
 		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -459,7 +455,6 @@ public class MetricRegistryImplTest extends TestLogger {
 	public void testExceptionIsolation() throws Exception {
 
 		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 8d91b81..325982b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -84,7 +84,6 @@ public class AbstractMetricGroupTest {
 	public void testScopeCachingForMultipleReporters() throws Exception {
 		Configuration config = new Configuration();
 		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/493c2857/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 324bb73..bcdcd63 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -47,7 +46,6 @@ public class MetricGroupRegistrationTest extends TestLogger {
 	@Test
 	public void testMetricInstantiation() {
 		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
 		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));