You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/09/27 17:19:37 UTC

flink git commit: [FLINK-4695] Introduce MetricRegistryConfiguration to encapsulate MetricRegistry parameters

Repository: flink
Updated Branches:
  refs/heads/master 84672c22f -> f1b5b35f5


[FLINK-4695] Introduce MetricRegistryConfiguration to encapsulate MetricRegistry parameters

In order to decouple the MetricRegistry object instantiation from the global configuration
the MetricRegistryConfiguration class has been introduced. This class encapsulates all
required information to create a MetricRegistry object. Furthermore, it encapsulates the
configuration parsing logic by offering a static method fromConfiguration, which constructs
a MetricRegistryConfiguration object from a Configuration.

This closes #2555.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1b5b35f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1b5b35f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1b5b35f

Branch: refs/heads/master
Commit: f1b5b35f595e7ae53001a4c46edbf0c9b78ee376
Parents: 84672c2
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 26 16:49:23 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Sep 27 18:55:30 2016 +0200

----------------------------------------------------------------------
 .../ScheduledDropwizardReporterTest.java        |   5 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |   6 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |   9 +-
 .../metrics/statsd/StatsDReporterTest.java      |   7 +-
 .../flink/runtime/metrics/MetricRegistry.java   |  67 ++------
 .../metrics/MetricRegistryConfiguration.java    | 168 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   5 +-
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../runtime/metrics/MetricRegistryTest.java     |  14 +-
 .../metrics/dump/MetricQueryServiceTest.java    |   3 +-
 .../metrics/groups/AbstractMetricGroupTest.java |   4 +-
 .../metrics/groups/JobManagerGroupTest.java     |  11 +-
 .../metrics/groups/JobManagerJobGroupTest.java  |   9 +-
 .../groups/MetricGroupRegistrationTest.java     |   5 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  10 +-
 .../metrics/groups/OperatorGroupTest.java       |   7 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  11 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |   9 +-
 .../metrics/groups/TaskMetricGroupTest.java     |  11 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |   4 +-
 ...askManagerComponentsStartupShutdownTest.java |   5 +-
 22 files changed, 265 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 1440028..9385510 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -78,7 +79,9 @@ public class ScheduledDropwizardReporterTest {
 		configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
 		configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
 
-		MetricRegistry metricRegistry = new MetricRegistry(configuration);
+		MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
+
+		MetricRegistry metricRegistry = new MetricRegistry(metricRegistryConfiguration);
 
 		char delimiter = metricRegistry.getDelimiter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 9c2ce41..2f2a536 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -32,11 +32,11 @@ import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
-import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -102,8 +102,10 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 
 		MetricRegistry registry = null;
 
+		MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
+
 		try {
-			registry = new MetricRegistry(config);
+			registry = new MetricRegistry(metricRegistryConfiguration);
 			DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
 				new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 089efe3..8968587 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
@@ -90,7 +91,7 @@ public class JMXReporterTest extends TestLogger {
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");
 
-		MetricRegistry reg = new MetricRegistry(cfg);
+		MetricRegistry reg = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
 
@@ -144,7 +145,7 @@ public class JMXReporterTest extends TestLogger {
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");
 
-		MetricRegistry reg = new MetricRegistry(cfg);
+		MetricRegistry reg = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
 
@@ -208,7 +209,7 @@ public class JMXReporterTest extends TestLogger {
 			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-			registry = new MetricRegistry(config);
+			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -258,7 +259,7 @@ public class JMXReporterTest extends TestLogger {
 			config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-			registry = new MetricRegistry(config);
+			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index ad982f1..e53ef44 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -82,7 +83,7 @@ public class StatsDReporterTest extends TestLogger {
 		configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
 		configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
 
-		MetricRegistry metricRegistry = new MetricRegistry(configuration);
+		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
 
 		char delimiter = metricRegistry.getDelimiter();
 
@@ -150,7 +151,7 @@ public class StatsDReporterTest extends TestLogger {
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port);
 
-			registry = new MetricRegistry(config);
+			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -224,7 +225,7 @@ public class StatsDReporterTest extends TestLogger {
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port);
 
-			registry = new MetricRegistry(config);
+			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 			TestMeter meter = new TestMeter();
 			metricGroup.meter(meterName, meter);

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 9c96858..ae44812 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.metrics;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
@@ -30,7 +30,6 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,43 +59,26 @@ public class MetricRegistry {
 	/**
 	 * Creates a new MetricRegistry and starts the configured reporter.
 	 */
-	public MetricRegistry(Configuration config) {
-		// first parse the scope formats, these are needed for all reporters
-		ScopeFormats scopeFormats;
-		try {
-			scopeFormats = createScopeConfig(config);
-		}
-		catch (Exception e) {
-			LOG.warn("Failed to parse scope format, using default scope formats", e);
-			scopeFormats = new ScopeFormats();
-		}
-		this.scopeFormats = scopeFormats;
-
-		char delim;
-		try {
-			delim = config.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
-		} catch (Exception e) {
-			LOG.warn("Failed to parse delimiter, using default delimiter.", e);
-			delim = '.';
-		}
-		this.delimiter = delim;
+	public MetricRegistry(MetricRegistryConfiguration config) {
+		this.scopeFormats = config.getScopeFormats();
+		this.delimiter = config.getDelimiter();
 
 		// second, instantiate any custom configured reporters
 		this.reporters = new ArrayList<>();
 
-		final String definedReporters = config.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+		List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
 
-		if (definedReporters == null) {
+		if (reporterConfigurations.isEmpty()) {
 			// no reporters defined
 			// by default, don't report anything
 			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
 			this.executor = null;
 		} else {
 			// we have some reporters so
-			String[] namedReporters = definedReporters.split("\\s*,\\s*");
-			for (String namedReporter : namedReporters) {
+			for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
+				String namedReporter = reporterConfiguration.f0;
+				Configuration reporterConfig = reporterConfiguration.f1;
 
-				DelegatingConfiguration reporterConfig = new DelegatingConfiguration(config, ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + ".");
 				final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
 				if (className == null) {
 					LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
@@ -129,13 +111,13 @@ public class MetricRegistry {
 					reporterInstance.open(metricConfig);
 
 					if (reporterInstance instanceof Scheduled) {
-						if (this.executor == null) {
+						if (executor == null) {
 							executor = Executors.newSingleThreadScheduledExecutor();
 						}
 						LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
 
 						executor.scheduleWithFixedDelay(
-								new ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
+								new MetricRegistry.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
 					} else {
 						LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
 					}
@@ -143,7 +125,7 @@ public class MetricRegistry {
 				}
 				catch (Throwable t) {
 					shutdownExecutor();
-					LOG.error("Could not instantiate metrics reporter " + namedReporter + ". Metrics might not be exposed/reported.", t);
+					LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
 				}
 			}
 		}
@@ -171,7 +153,7 @@ public class MetricRegistry {
 	}
 
 	/**
-	 * Shuts down this registry and the associated {@link org.apache.flink.metrics.reporter.MetricReporter}.
+	 * Shuts down this registry and the associated {@link MetricReporter}.
 	 */
 	public void shutdown() {
 		if (reporters != null) {
@@ -234,7 +216,7 @@ public class MetricRegistry {
 	}
 
 	/**
-	 * Un-registers the given {@link org.apache.flink.metrics.Metric} with this registry.
+	 * Un-registers the given {@link Metric} with this registry.
 	 *
 	 * @param metric      the metric that should be removed
 	 * @param metricName  the name of the metric
@@ -258,27 +240,6 @@ public class MetricRegistry {
 	}
 
 	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	static ScopeFormats createScopeConfig(Configuration config) {
-		String jmFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-		String jmJobFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-		String tmFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-		String tmJobFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-		String taskFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-		String operatorFormat = config.getString(
-			ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
-
-		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
-	}
-
-	// ------------------------------------------------------------------------
 
 	/**
 	 * This task is explicitly a static class, so that it does not hold any references to the enclosing

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
new file mode 100644
index 0000000..596fc82
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Configuration object for {@link MetricRegistry}.
+ */
+public class MetricRegistryConfiguration {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MetricRegistryConfiguration.class);
+
+	private static volatile MetricRegistryConfiguration DEFAULT_CONFIGURATION;
+
+	// regex pattern to split the defined reporters
+	private static final Pattern splitPattern = Pattern.compile("\\s*,\\s*");
+
+	// scope formats for the different components
+	private final ScopeFormats scopeFormats;
+
+	// delimiter for the scope strings
+	private final char delimiter;
+
+	// contains for every configured reporter its name and the configuration object
+	private final List<Tuple2<String, Configuration>> reporterConfigurations;
+
+	public MetricRegistryConfiguration(
+		ScopeFormats scopeFormats,
+		char delimiter,
+		List<Tuple2<String, Configuration>> reporterConfigurations) {
+
+		this.scopeFormats = Preconditions.checkNotNull(scopeFormats);
+		this.delimiter = delimiter;
+		this.reporterConfigurations = Preconditions.checkNotNull(reporterConfigurations);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Getter
+	// ------------------------------------------------------------------------
+
+	public ScopeFormats getScopeFormats() {
+		return scopeFormats;
+	}
+
+	public char getDelimiter() {
+		return delimiter;
+	}
+
+	public List<Tuple2<String, Configuration>> getReporterConfigurations() {
+		return reporterConfigurations;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Static factory methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Create a metric registry configuration object from the given {@link Configuration}.
+	 *
+	 * @param configuration to generate the metric registry configuration from
+	 * @return Metric registry configuration generated from the configuration
+	 */
+	public static MetricRegistryConfiguration fromConfiguration(Configuration configuration) {
+		ScopeFormats scopeFormats;
+		try {
+			scopeFormats = createScopeConfig(configuration);
+		} catch (Exception e) {
+			LOG.warn("Failed to parse scope format, using default scope formats", e);
+			scopeFormats = new ScopeFormats();
+		}
+
+		char delim;
+		try {
+			delim = configuration.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
+		} catch (Exception e) {
+			LOG.warn("Failed to parse delimiter, using default delimiter.", e);
+			delim = '.';
+		}
+
+		final String definedReporters = configuration.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+		List<Tuple2<String, Configuration>> reporterConfigurations;
+
+		if (definedReporters == null) {
+			reporterConfigurations = Collections.emptyList();
+		} else {
+			String[] namedReporters = splitPattern.split(definedReporters);
+
+			reporterConfigurations = new ArrayList<>(namedReporters.length);
+
+			for (String namedReporter: namedReporters) {
+				DelegatingConfiguration delegatingConfiguration = new DelegatingConfiguration(
+					configuration,
+					ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');
+
+				reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
+			}
+		}
+
+		return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations);
+	}
+
+	/**
+	 *	Create the scope formats from the given {@link Configuration}.
+	 *
+	 * @param configuration to extract the scope formats from
+	 * @return Scope formats extracted from the given configuration
+	 */
+	static ScopeFormats createScopeConfig(Configuration configuration) {
+		String jmFormat = configuration.getString(
+			ConfigConstants.METRICS_SCOPE_NAMING_JM, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
+		String jmJobFormat = configuration.getString(
+			ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
+		String tmFormat = configuration.getString(
+			ConfigConstants.METRICS_SCOPE_NAMING_TM, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
+		String tmJobFormat = configuration.getString(
+			ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
+		String taskFormat = configuration.getString(
+			ConfigConstants.METRICS_SCOPE_NAMING_TASK, ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
+		String operatorFormat = configuration.getString(
+			ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+
+		return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, tmJobFormat, taskFormat, operatorFormat);
+	}
+
+	public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() {
+		// create the default metric registry configuration only once
+		if (DEFAULT_CONFIGURATION == null) {
+			synchronized (MetricRegistryConfiguration.class) {
+				if (DEFAULT_CONFIGURATION == null) {
+					DEFAULT_CONFIGURATION = fromConfiguration(new Configuration());
+				}
+			}
+		}
+
+		return DEFAULT_CONFIGURATION;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a733943..869f6e5 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -67,13 +67,13 @@ import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, Accum
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.InfoMessage
 import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration}
-import org.apache.flink.runtime.security.{SecurityContext}
+import org.apache.flink.runtime.security.SecurityContext
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -2635,7 +2635,7 @@ object JobManager {
     }
 
     val metricRegistry = try {
-      Option(new FlinkMetricRegistry(configuration))
+      Option(new FlinkMetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)))
     } catch {
       case _: Exception =>
         None

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 04f3168..c6dcbb0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -66,7 +66,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStack
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateRegistry
@@ -2081,7 +2081,8 @@ object TaskManager {
       case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
     }
 
-    val metricsRegistry = new FlinkMetricRegistry(configuration)
+    val metricsRegistry = new FlinkMetricRegistry(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
 
     (taskManagerConfig,
       taskManagerLocation,

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index a58d910..70c2bf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.Slot;
@@ -99,7 +100,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 	
 			FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
 	
-			MetricRegistry metricRegistry = new MetricRegistry(config);
+			MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 	
 			assertTrue(metricRegistry.getReporters().size() == 1);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index f2bb8fe..5f7defb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -48,7 +48,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
-		MetricRegistry metricRegistry = new MetricRegistry(config);
+		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		assertTrue(metricRegistry.getReporters().size() == 1);
 
@@ -78,7 +78,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
 
-		MetricRegistry metricRegistry = new MetricRegistry(config);
+		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		assertTrue(metricRegistry.getReporters().size() == 3);
 
@@ -128,7 +128,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
 
-		new MetricRegistry(config).shutdown();
+		new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
 	}
 
 	protected static class TestReporter2 extends TestReporter {
@@ -153,7 +153,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
 
-		MetricRegistry registry = new MetricRegistry(config);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		long start = System.currentTimeMillis();
 		for (int x = 0; x < 10; x++) {
@@ -195,7 +195,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
 
-		MetricRegistry registry = new MetricRegistry(config);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 		root.counter("rootCounter");
@@ -259,7 +259,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "C");
 		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, "D");
 
-		ScopeFormats scopeConfig = MetricRegistry.createScopeConfig(config);
+		ScopeFormats scopeConfig = MetricRegistryConfiguration.createScopeConfig(config);
 
 		assertEquals("A", scopeConfig.getTaskManagerFormat().format());
 		assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
@@ -273,7 +273,7 @@ public class MetricRegistryTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
 		config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B.C.D.E");
 
-		MetricRegistry registry = new MetricRegistry(config);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id");
 		assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name"));

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 91563ec..e5ace68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
@@ -76,7 +77,7 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 		};
 
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 
 		MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 78aac64..ca3810a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.flink.runtime.metrics.groups;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.junit.Test;
 
@@ -32,7 +32,7 @@ public class AbstractMetricGroupTest {
 	 */
 	@Test
 	public void testGetAllVariables() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
 			@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 1b9b24f..317d19e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -39,7 +40,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void addAndRemoveJobs() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
 
 		final JobID jid1 = new JobID();
@@ -71,7 +72,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCloseClosesAll() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
 
 		final JobID jid1 = new JobID();
@@ -97,7 +98,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
 
 		assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
@@ -110,7 +111,7 @@ public class JobManagerGroupTest extends TestLogger {
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");
 
@@ -122,7 +123,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
 
 		QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter());

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index c3443f2..ed4056f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -34,7 +35,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
 		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -55,7 +56,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc");
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 
@@ -78,7 +79,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter");
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 
@@ -99,7 +100,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
 		JobID jid = new JobID();
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
 		JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index a15b72e..f121b59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 
 import org.junit.Assert;
@@ -43,7 +44,7 @@ public class MetricGroupRegistrationTest {
 		config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
-		MetricRegistry registry = new MetricRegistry(config);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 
@@ -102,7 +103,7 @@ public class MetricGroupRegistrationTest {
 	public void testDuplicateGroupName() {
 		Configuration config = new Configuration();
 
-		MetricRegistry registry = new MetricRegistry(config);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
 		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 3fe8d75..038fd1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -37,6 +37,8 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 public class MetricGroupTest extends TestLogger {
+
+	private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
 	
 	private MetricRegistry registry;
 
@@ -44,7 +46,7 @@ public class MetricGroupTest extends TestLogger {
 
 	@Before
 	public void createRegistry() {
-		this.registry = new MetricRegistry(new Configuration());
+		this.registry = new MetricRegistry(defaultMetricRegistryConfiguration);
 	}
 
 	@After
@@ -122,7 +124,7 @@ public class MetricGroupTest extends TestLogger {
 		JobID jid = new JobID();
 		AbstractID vid = new AbstractID();
 		AbstractID eid = new AbstractID();
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(defaultMetricRegistryConfiguration);
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
@@ -140,7 +142,7 @@ public class MetricGroupTest extends TestLogger {
 	private static class ExceptionOnRegisterRegistry extends MetricRegistry {
 		
 		public ExceptionOnRegisterRegistry() {
-			super(new Configuration());
+			super(defaultMetricRegistryConfiguration);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 2357936..7f82d21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
@@ -39,7 +40,7 @@ public class OperatorGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
 		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -60,7 +61,7 @@ public class OperatorGroupTest extends TestLogger {
 
 	@Test
 	public void testVariables() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		JobID jid = new JobID();
 		AbstractID tid = new AbstractID();
@@ -99,7 +100,7 @@ public class OperatorGroupTest extends TestLogger {
 		JobID jid = new JobID();
 		AbstractID vid = new AbstractID();
 		AbstractID eid = new AbstractID();
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index a68e59d..49f2268 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -51,7 +52,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void addAndRemoveJobs() throws IOException {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
 				registry, "localhost", new AbstractID().toString());
@@ -170,7 +171,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCloseClosesAll() throws IOException {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
 				registry, "localhost", new AbstractID().toString());
 
@@ -253,7 +254,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id");
 
 		assertArrayEquals(new String[] { "localhost", "taskmanager", "id" }, group.getScopeComponents());
@@ -265,7 +266,7 @@ public class TaskManagerGroupTest extends TestLogger {
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
 
 		assertArrayEquals(new String[] { "constant", "host", "foo", "host" }, group.getScopeComponents());
@@ -275,7 +276,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 
 		QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index 175ded1..f9bef6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -35,7 +36,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
 		JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -55,7 +56,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "some-constant.<job_name>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 
@@ -77,7 +78,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "peter.<tm_id>");
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "*.some-constant.<job_id>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 
@@ -97,7 +98,7 @@ public class TaskManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
 		JobID jid = new JobID();
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index c65c1da..b2ae69e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -42,7 +43,7 @@ public class TaskMetricGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		AbstractID vertexId = new AbstractID();
 		AbstractID executionId = new AbstractID();
 
@@ -66,7 +67,7 @@ public class TaskMetricGroupTest extends TestLogger {
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, "def");
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 		AbstractID vertexId = new AbstractID();
@@ -91,7 +92,7 @@ public class TaskMetricGroupTest extends TestLogger {
 	public void testGenerateScopeWilcard() {
 		Configuration cfg = new Configuration();
 		cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, "*.<task_attempt_id>.<subtask_index>");
-		MetricRegistry registry = new MetricRegistry(cfg);
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		AbstractID executionId = new AbstractID();
 
@@ -116,7 +117,7 @@ public class TaskMetricGroupTest extends TestLogger {
 		JobID jid = new JobID();
 		AbstractID vid = new AbstractID();
 		AbstractID eid = new AbstractID();
-		MetricRegistry registry = new MetricRegistry(new Configuration());
+		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
@@ -151,7 +152,7 @@ public class TaskMetricGroupTest extends TestLogger {
 		private int counter = 0;
 
 		CountingMetricRegistry(Configuration config) {
-			super(config);
+			super(MetricRegistryConfiguration.fromConfiguration(config));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index ff0c480..b4a7400 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -35,7 +35,7 @@ import java.util.UUID;
 
 public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 	
-	private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(new Configuration());
+	private static final MetricRegistry EMPTY_REGISTRY = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 	
 	public UnregisteredTaskMetricsGroup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index bc83db9..1f93e9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -48,6 +48,7 @@ import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
@@ -129,6 +130,8 @@ public class TaskManagerComponentsStartupShutdownTest {
 
 			LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService(jobManager.path().toString());
 
+			MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
+
 			// create the task manager
 			final Props tmProps = Props.create(
 				TaskManager.class,
@@ -140,7 +143,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 				network,
 				numberOfSlots,
 				leaderRetrievalService,
-				new MetricRegistry(config));
+				new MetricRegistry(metricRegistryConfiguration));
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);