You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:42 UTC

[2/7] flink git commit: [FLINK-7876] Register TaskManagerMetricGroup under ResourceID

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 1a3ca70..d8e65a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -502,7 +502,7 @@ public class ResourceManagerTest extends TestLogger {
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
 
-		final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		final MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class);
 		final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class);
 		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final SlotManager slotManager = new SlotManager(
@@ -601,7 +601,7 @@ public class ResourceManagerTest extends TestLogger {
 		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 		final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
 
-		final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		final MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class);
 		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
 			highAvailabilityServices,
 			rpcService.getScheduledExecutor(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d1ca757..8558145 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -114,7 +114,7 @@ public class DispatcherTest extends TestLogger {
 			mock(ResourceManagerGateway.class),
 			mock(BlobServer.class),
 			heartbeatServices,
-			mock(MetricRegistry.class),
+			mock(MetricRegistryImpl.class),
 			fatalErrorHandler,
 			jobManagerRunner,
 			jobId);
@@ -174,7 +174,7 @@ public class DispatcherTest extends TestLogger {
 			mock(ResourceManagerGateway.class),
 			mock(BlobServer.class),
 			heartbeatServices,
-			mock(MetricRegistry.class),
+			mock(MetricRegistryImpl.class),
 			fatalErrorHandler,
 			mock(JobManagerRunner.class),
 			jobId);
@@ -209,7 +209,7 @@ public class DispatcherTest extends TestLogger {
 				ResourceManagerGateway resourceManagerGateway,
 				BlobServer blobServer,
 				HeartbeatServices heartbeatServices,
-				MetricRegistry metricRegistry,
+				MetricRegistryImpl metricRegistry,
 				FatalErrorHandler fatalErrorHandler,
 				JobManagerRunner jobManagerRunner,
 				JobID expectedJobId) throws Exception {
@@ -238,7 +238,7 @@ public class DispatcherTest extends TestLogger {
 				HighAvailabilityServices highAvailabilityServices,
 				HeartbeatServices heartbeatServices,
 				JobManagerServices jobManagerServices,
-				MetricRegistry metricRegistry,
+				MetricRegistryImpl metricRegistry,
 				OnCompletionActions onCompleteActions,
 				FatalErrorHandler fatalErrorHandler) throws Exception {
 			assertEquals(expectedJobId, jobGraph.getJobID());

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 7df26fc..d843da2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -62,7 +62,9 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -206,7 +208,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				mySubmittedJobGraphStore,
 				checkpointStateFactory,
 				jobRecoveryTimeout,
-				Option.<MetricRegistry>empty(),
+				Option.<MetricRegistryImpl>empty(),
 				Option.<String>empty());
 
 			jobManager = system.actorOf(jobManagerProps);
@@ -217,6 +219,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				ResourceID.generate(),
 				system,
 				testingHighAvailabilityServices,
+				new NoOpMetricRegistry(),
 				"localhost",
 				Option.apply("taskmanager"),
 				true,
@@ -380,7 +383,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				submittedJobGraphStore,
 				mock(CheckpointRecoveryFactory.class),
 				jobRecoveryTimeout,
-				Option.<MetricRegistry>apply(null),
+				Option.<MetricRegistryImpl>apply(null),
 				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
 
 			jobManager = system.actorOf(jobManagerProps);
@@ -418,7 +421,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 			SubmittedJobGraphStore submittedJobGraphs,
 			CheckpointRecoveryFactory checkpointRecoveryFactory,
 			FiniteDuration jobRecoveryTimeout,
-			Option<MetricRegistry> metricsRegistry,
+			JobManagerMetricGroup jobManagerMetricGroup,
 			Collection<JobID> recoveredJobs) {
 			super(
 				flinkConfiguration,
@@ -435,7 +438,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				submittedJobGraphs,
 				checkpointRecoveryFactory,
 				jobRecoveryTimeout,
-				metricsRegistry,
+				jobManagerMetricGroup,
 				Option.empty());
 
 			this.recoveredJobs = recoveredJobs;

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index bd7f11f..a697aae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
 import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
@@ -624,6 +625,7 @@ public class JobManagerTest extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			highAvailabilityServices,
+			new NoOpMetricRegistry(),
 			Option.empty(),
 			TestingJobManager.class,
 			MemoryArchivist.class)._1();
@@ -645,6 +647,7 @@ public class JobManagerTest extends TestLogger {
 			ResourceID.generate(),
 			system,
 			highAvailabilityServices,
+			new NoOpMetricRegistry(),
 			"localhost",
 			scala.Option.<String>empty(),
 			true,
@@ -841,6 +844,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -859,6 +863,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				"localhost",
 				Option.apply("tm"),
 				true,
@@ -1051,6 +1056,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -1069,6 +1075,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				"localhost",
 				Option.apply("tm"),
 				true,
@@ -1164,6 +1171,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -1182,6 +1190,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				"localhost",
 				Option.apply("tm"),
 				true,
@@ -1275,6 +1284,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -1296,6 +1306,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				"localhost",
 				Option.apply("tm"),
 				true,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 340a735..cc93f18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -94,6 +95,7 @@ public class JobSubmitTest {
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			highAvailabilityServices,
+			new NoOpMetricRegistry(),
 			Option.empty(),
 			JobManager.class,
 			MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index b4f50fb..083d6e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -115,7 +115,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 			haServices,
 			heartbeatServices,
 			JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
-			new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
+			new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
 			jobCompletion,
 			jobCompletion));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index c3b57fa..e4ceb40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -28,7 +28,6 @@ import akka.util.Timeout;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -43,7 +42,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -199,7 +198,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			submittedJobGraphStore,
 			checkpointRecoveryFactory,
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-			Option.<MetricRegistry>empty(),
+			Option.<MetricRegistryImpl>empty(),
 			Option.<String>empty());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
new file mode 100644
index 0000000..b0b20b2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MetricRegistryImpl}.
+ */
+public class MetricRegistryImplTest extends TestLogger {
+
+	private static final char GLOBAL_DEFAULT_DELIMITER = '.';
+
+	@Test
+	public void testIsShutdown() {
+		MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+		Assert.assertFalse(metricRegistry.isShutdown());
+
+		metricRegistry.shutdown();
+
+		Assert.assertTrue(metricRegistry.isShutdown());
+	}
+
+	/**
+	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
+	 */
+	@Test
+	public void testReporterInstantiation() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
+
+		MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		assertTrue(metricRegistry.getReporters().size() == 1);
+
+		Assert.assertTrue(TestReporter1.wasOpened);
+
+		metricRegistry.shutdown();
+	}
+
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
+	protected static class TestReporter1 extends TestReporter {
+		public static boolean wasOpened = false;
+
+		@Override
+		public void open(MetricConfig config) {
+			wasOpened = true;
+		}
+	}
+
+	/**
+	 * Verifies that multiple reporters are instantiated correctly.
+	 */
+	@Test
+	public void testMultipleReporterInstantiation() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
+
+		MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		assertTrue(metricRegistry.getReporters().size() == 3);
+
+		Assert.assertTrue(TestReporter11.wasOpened);
+		Assert.assertTrue(TestReporter12.wasOpened);
+		Assert.assertTrue(TestReporter13.wasOpened);
+
+		metricRegistry.shutdown();
+	}
+
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
+	protected static class TestReporter11 extends TestReporter {
+		public static boolean wasOpened = false;
+
+		@Override
+		public void open(MetricConfig config) {
+			wasOpened = true;
+		}
+	}
+
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
+	protected static class TestReporter12 extends TestReporter {
+		public static boolean wasOpened = false;
+
+		@Override
+		public void open(MetricConfig config) {
+			wasOpened = true;
+		}
+	}
+
+	/**
+	 * Reporter that exposes whether open() was called.
+	 */
+	protected static class TestReporter13 extends TestReporter {
+		public static boolean wasOpened = false;
+
+		@Override
+		public void open(MetricConfig config) {
+			wasOpened = true;
+		}
+	}
+
+	/**
+	 * Verifies that configured arguments are properly forwarded to the reporter.
+	 */
+	@Test
+	public void testReporterArgumentForwarding() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
+
+		new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
+
+		Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null));
+		Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null));
+	}
+
+	/**
+	 * Reporter that exposes the {@link MetricConfig} it was given.
+	 */
+	protected static class TestReporter2 extends TestReporter {
+		static MetricConfig mc;
+		@Override
+		public void open(MetricConfig config) {
+			mc = config;
+		}
+	}
+
+	/**
+	 * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics.
+	 *
+	 * @throws InterruptedException
+	 */
+	@Test
+	public void testReporterScheduling() throws InterruptedException {
+		Configuration config = new Configuration();
+
+		config.setString(MetricOptions.REPORTERS_LIST, "test");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		long start = System.currentTimeMillis();
+
+		// only start counting from now on
+		TestReporter3.reportCount = 0;
+
+		for (int x = 0; x < 10; x++) {
+			Thread.sleep(100);
+			int reportCount = TestReporter3.reportCount;
+			long curT = System.currentTimeMillis();
+			/**
+			 * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
+			 * This value however does not not take the first triggered report into account (=> +1).
+			 * Furthermore we have to account for the mis-alignment between reports being triggered and our time
+			 * measurement (=> +1); for T=200 a total of 4-6 reports may have been
+			 * triggered depending on whether the end of the interval for the first reports ends before
+			 * or after T=50.
+			 */
+			long maxAllowedReports = (curT - start) / 50 + 2;
+			Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount);
+		}
+		Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0);
+
+		registry.shutdown();
+	}
+
+	/**
+	 * Reporter that exposes how often report() was called.
+	 */
+	protected static class TestReporter3 extends TestReporter implements Scheduled {
+		public static int reportCount = 0;
+
+		@Override
+		public void report() {
+			reportCount++;
+		}
+	}
+
+	/**
+	 * Verifies that reporters are notified of added/removed metrics.
+	 */
+	@Test
+	public void testReporterNotifications() {
+		Configuration config = new Configuration();
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
+		root.counter("rootCounter");
+
+		assertTrue(TestReporter6.addedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter6.addedMetricName);
+
+		assertTrue(TestReporter7.addedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter7.addedMetricName);
+
+		root.close();
+
+		assertTrue(TestReporter6.removedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter6.removedMetricName);
+
+		assertTrue(TestReporter7.removedMetric instanceof Counter);
+		assertEquals("rootCounter", TestReporter7.removedMetricName);
+
+		registry.shutdown();
+	}
+
+	/**
+	 * Reporter that exposes the name and metric instance of the last metric that was added or removed.
+	 */
+	protected static class TestReporter6 extends TestReporter {
+		static Metric addedMetric;
+		static String addedMetricName;
+
+		static Metric removedMetric;
+		static String removedMetricName;
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+			addedMetric = metric;
+			addedMetricName = metricName;
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+			removedMetric = metric;
+			removedMetricName = metricName;
+		}
+	}
+
+	/**
+	 * Reporter that exposes the name and metric instance of the last metric that was added or removed.
+	 */
+	protected static class TestReporter7 extends TestReporter {
+		static Metric addedMetric;
+		static String addedMetricName;
+
+		static Metric removedMetric;
+		static String removedMetricName;
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+			addedMetric = metric;
+			addedMetricName = metricName;
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+			removedMetric = metric;
+			removedMetricName = metricName;
+		}
+	}
+
+	/**
+	 * Verifies that the scope configuration is properly extracted.
+	 */
+	@Test
+	public void testScopeConfig() {
+		Configuration config = new Configuration();
+
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+		config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+		config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+		config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
+
+		ScopeFormats scopeConfig = ScopeFormats.fromConfig(config);
+
+		assertEquals("A", scopeConfig.getTaskManagerFormat().format());
+		assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
+		assertEquals("C", scopeConfig.getTaskFormat().format());
+		assertEquals("D", scopeConfig.getOperatorFormat().format());
+	}
+
+	@Test
+	public void testConfigurableDelimiter() {
+		Configuration config = new Configuration();
+		config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id");
+		assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name"));
+
+		registry.shutdown();
+	}
+
+	@Test
+	public void testConfigurableDelimiterForReporters() {
+		Configuration config = new Configuration();
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
+		assertEquals('_', registry.getDelimiter(0));
+		assertEquals('-', registry.getDelimiter(1));
+		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2));
+		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3));
+		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1));
+
+		registry.shutdown();
+	}
+
+	@Test
+	public void testConfigurableDelimiterForReportersInGroup() {
+		Configuration config = new Configuration();
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+		List<MetricReporter> reporters = registry.getReporters();
+		((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1  reporter
+		((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
+		((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+		((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+
+		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
+		group.counter("C");
+		group.close();
+		registry.shutdown();
+		assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
+		assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
+	}
+
+	/**
+	 * Tests that the query actor will be stopped when the MetricRegistry is shut down.
+	 */
+	@Test
+	public void testQueryActorShutdown() throws Exception {
+		final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+		registry.startQueryService(actorSystem, null);
+
+		ActorRef queryServiceActor = registry.getQueryService();
+
+		registry.shutdown();
+
+		try {
+			Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);
+
+			fail("The query actor should be terminated resulting in a ActorNotFound exception.");
+		} catch (ActorNotFound e) {
+			// we expect the query actor to be shut down
+		}
+	}
+
+	/**
+	 * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier.
+	 */
+	public static class TestReporter8 extends TestReporter {
+		char expectedDelimiter;
+		public static int numCorrectDelimitersForRegister = 0;
+		public static int numCorrectDelimitersForUnregister = 0;
+
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+			String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
+			assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
+			assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
+			numCorrectDelimitersForRegister++;
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+			String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
+			assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
+			assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
+			numCorrectDelimitersForUnregister++;
+		}
+	}
+
+	@Test
+	public void testExceptionIsolation() throws Exception {
+
+		Configuration config = new Configuration();
+		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName());
+		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
+
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+		Counter metric = new SimpleCounter();
+		registry.register(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+		assertEquals(metric, TestReporter7.addedMetric);
+		assertEquals("counter", TestReporter7.addedMetricName);
+
+		registry.unregister(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+		assertEquals(metric, TestReporter7.removedMetric);
+		assertEquals("counter", TestReporter7.removedMetricName);
+
+		registry.shutdown();
+	}
+
+	/**
+	 * Reporter that throws an exception when it is notified of an added or removed metric.
+	 */
+	protected static class FailingReporter extends TestReporter {
+		@Override
+		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+			throw new RuntimeException();
+		}
+
+		@Override
+		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+			throw new RuntimeException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
deleted file mode 100644
index 284b86a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-import org.apache.flink.runtime.metrics.util.TestReporter;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorNotFound;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link MetricRegistry}.
- */
-public class MetricRegistryTest extends TestLogger {
-
-	private static final char GLOBAL_DEFAULT_DELIMITER = '.';
-
-	@Test
-	public void testIsShutdown() {
-		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
-		Assert.assertFalse(metricRegistry.isShutdown());
-
-		metricRegistry.shutdown();
-
-		Assert.assertTrue(metricRegistry.isShutdown());
-	}
-
-	/**
-	 * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
-	 */
-	@Test
-	public void testReporterInstantiation() {
-		Configuration config = new Configuration();
-
-		config.setString(MetricOptions.REPORTERS_LIST, "test");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
-
-		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		assertTrue(metricRegistry.getReporters().size() == 1);
-
-		Assert.assertTrue(TestReporter1.wasOpened);
-
-		metricRegistry.shutdown();
-	}
-
-	/**
-	 * Reporter that exposes whether open() was called.
-	 */
-	protected static class TestReporter1 extends TestReporter {
-		public static boolean wasOpened = false;
-
-		@Override
-		public void open(MetricConfig config) {
-			wasOpened = true;
-		}
-	}
-
-	/**
-	 * Verifies that multiple reporters are instantiated correctly.
-	 */
-	@Test
-	public void testMultipleReporterInstantiation() {
-		Configuration config = new Configuration();
-
-		config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
-
-		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		assertTrue(metricRegistry.getReporters().size() == 3);
-
-		Assert.assertTrue(TestReporter11.wasOpened);
-		Assert.assertTrue(TestReporter12.wasOpened);
-		Assert.assertTrue(TestReporter13.wasOpened);
-
-		metricRegistry.shutdown();
-	}
-
-	/**
-	 * Reporter that exposes whether open() was called.
-	 */
-	protected static class TestReporter11 extends TestReporter {
-		public static boolean wasOpened = false;
-
-		@Override
-		public void open(MetricConfig config) {
-			wasOpened = true;
-		}
-	}
-
-	/**
-	 * Reporter that exposes whether open() was called.
-	 */
-	protected static class TestReporter12 extends TestReporter {
-		public static boolean wasOpened = false;
-
-		@Override
-		public void open(MetricConfig config) {
-			wasOpened = true;
-		}
-	}
-
-	/**
-	 * Reporter that exposes whether open() was called.
-	 */
-	protected static class TestReporter13 extends TestReporter {
-		public static boolean wasOpened = false;
-
-		@Override
-		public void open(MetricConfig config) {
-			wasOpened = true;
-		}
-	}
-
-	/**
-	 * Verifies that configured arguments are properly forwarded to the reporter.
-	 */
-	@Test
-	public void testReporterArgumentForwarding() {
-		Configuration config = new Configuration();
-
-		config.setString(MetricOptions.REPORTERS_LIST, "test");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
-
-		new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
-
-		Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null));
-		Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null));
-	}
-
-	/**
-	 * Reporter that exposes the {@link MetricConfig} it was given.
-	 */
-	protected static class TestReporter2 extends TestReporter {
-		static MetricConfig mc;
-		@Override
-		public void open(MetricConfig config) {
-			mc = config;
-		}
-	}
-
-	/**
-	 * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics.
-	 *
-	 * @throws InterruptedException
-	 */
-	@Test
-	public void testReporterScheduling() throws InterruptedException {
-		Configuration config = new Configuration();
-
-		config.setString(MetricOptions.REPORTERS_LIST, "test");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		long start = System.currentTimeMillis();
-
-		// only start counting from now on
-		TestReporter3.reportCount = 0;
-
-		for (int x = 0; x < 10; x++) {
-			Thread.sleep(100);
-			int reportCount = TestReporter3.reportCount;
-			long curT = System.currentTimeMillis();
-			/**
-			 * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
-			 * This value however does not not take the first triggered report into account (=> +1).
-			 * Furthermore we have to account for the mis-alignment between reports being triggered and our time
-			 * measurement (=> +1); for T=200 a total of 4-6 reports may have been
-			 * triggered depending on whether the end of the interval for the first reports ends before
-			 * or after T=50.
-			 */
-			long maxAllowedReports = (curT - start) / 50 + 2;
-			Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount);
-		}
-		Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0);
-
-		registry.shutdown();
-	}
-
-	/**
-	 * Reporter that exposes how often report() was called.
-	 */
-	protected static class TestReporter3 extends TestReporter implements Scheduled {
-		public static int reportCount = 0;
-
-		@Override
-		public void report() {
-			reportCount++;
-		}
-	}
-
-	/**
-	 * Verifies that reporters are notified of added/removed metrics.
-	 */
-	@Test
-	public void testReporterNotifications() {
-		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
-		root.counter("rootCounter");
-
-		assertTrue(TestReporter6.addedMetric instanceof Counter);
-		assertEquals("rootCounter", TestReporter6.addedMetricName);
-
-		assertTrue(TestReporter7.addedMetric instanceof Counter);
-		assertEquals("rootCounter", TestReporter7.addedMetricName);
-
-		root.close();
-
-		assertTrue(TestReporter6.removedMetric instanceof Counter);
-		assertEquals("rootCounter", TestReporter6.removedMetricName);
-
-		assertTrue(TestReporter7.removedMetric instanceof Counter);
-		assertEquals("rootCounter", TestReporter7.removedMetricName);
-
-		registry.shutdown();
-	}
-
-	/**
-	 * Reporter that exposes the name and metric instance of the last metric that was added or removed.
-	 */
-	protected static class TestReporter6 extends TestReporter {
-		static Metric addedMetric;
-		static String addedMetricName;
-
-		static Metric removedMetric;
-		static String removedMetricName;
-
-		@Override
-		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			addedMetric = metric;
-			addedMetricName = metricName;
-		}
-
-		@Override
-		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			removedMetric = metric;
-			removedMetricName = metricName;
-		}
-	}
-
-	/**
-	 * Reporter that exposes the name and metric instance of the last metric that was added or removed.
-	 */
-	protected static class TestReporter7 extends TestReporter {
-		static Metric addedMetric;
-		static String addedMetricName;
-
-		static Metric removedMetric;
-		static String removedMetricName;
-
-		@Override
-		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			addedMetric = metric;
-			addedMetricName = metricName;
-		}
-
-		@Override
-		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			removedMetric = metric;
-			removedMetricName = metricName;
-		}
-	}
-
-	/**
-	 * Verifies that the scope configuration is properly extracted.
-	 */
-	@Test
-	public void testScopeConfig() {
-		Configuration config = new Configuration();
-
-		config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
-		config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
-		config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
-		config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
-
-		ScopeFormats scopeConfig = ScopeFormats.fromConfig(config);
-
-		assertEquals("A", scopeConfig.getTaskManagerFormat().format());
-		assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
-		assertEquals("C", scopeConfig.getTaskFormat().format());
-		assertEquals("D", scopeConfig.getOperatorFormat().format());
-	}
-
-	@Test
-	public void testConfigurableDelimiter() {
-		Configuration config = new Configuration();
-		config.setString(MetricOptions.SCOPE_DELIMITER, "_");
-		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id");
-		assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name"));
-
-		registry.shutdown();
-	}
-
-	@Test
-	public void testConfigurableDelimiterForReporters() {
-		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
-		assertEquals('_', registry.getDelimiter(0));
-		assertEquals('-', registry.getDelimiter(1));
-		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2));
-		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3));
-		assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1));
-
-		registry.shutdown();
-	}
-
-	@Test
-	public void testConfigurableDelimiterForReportersInGroup() {
-		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
-		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-		List<MetricReporter> reporters = registry.getReporters();
-		((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1  reporter
-		((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
-		((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
-		((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
-
-		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
-		group.counter("C");
-		group.close();
-		registry.shutdown();
-		assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
-		assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
-	}
-
-	/**
-	 * Tests that the query actor will be stopped when the MetricRegistry is shut down.
-	 */
-	@Test
-	public void testQueryActorShutdown() throws Exception {
-		final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
-		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-
-		registry.startQueryService(actorSystem, null);
-
-		ActorRef queryServiceActor = registry.getQueryService();
-
-		registry.shutdown();
-
-		try {
-			Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);
-
-			fail("The query actor should be terminated resulting in a ActorNotFound exception.");
-		} catch (ActorNotFound e) {
-			// we expect the query actor to be shut down
-		}
-	}
-
-	/**
-	 * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier.
-	 */
-	public static class TestReporter8 extends TestReporter {
-		char expectedDelimiter;
-		public static int numCorrectDelimitersForRegister = 0;
-		public static int numCorrectDelimitersForUnregister = 0;
-
-		@Override
-		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
-			assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
-			assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
-			numCorrectDelimitersForRegister++;
-		}
-
-		@Override
-		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
-			assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
-			assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
-			numCorrectDelimitersForUnregister++;
-		}
-	}
-
-	@Test
-	public void testExceptionIsolation() throws Exception {
-
-		Configuration config = new Configuration();
-		config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName());
-		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
-
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
-		Counter metric = new SimpleCounter();
-		registry.register(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
-
-		assertEquals(metric, TestReporter7.addedMetric);
-		assertEquals("counter", TestReporter7.addedMetricName);
-
-		registry.unregister(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
-
-		assertEquals(metric, TestReporter7.removedMetric);
-		assertEquals("counter", TestReporter7.removedMetricName);
-
-		registry.shutdown();
-	}
-
-	/**
-	 * Reporter that throws an exception when it is notified of an added or removed metric.
-	 */
-	protected static class FailingReporter extends TestReporter {
-		@Override
-		public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
-			throw new RuntimeException();
-		}
-
-		@Override
-		public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
-			throw new RuntimeException();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
new file mode 100644
index 0000000..1140e3d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+/**
+ * Metric registry which does nothing and is intended for testing purposes.
+ */
+public class NoOpMetricRegistry implements MetricRegistry {
+
+	final char delimiter = ',';
+
+	final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
+
+	@Override
+	public char getDelimiter() {
+		return delimiter;
+	}
+
+	@Override
+	public char getDelimiter(int index) {
+		return delimiter;
+	}
+
+	@Override
+	public int getNumberReporters() {
+		return 0;
+	}
+
+	@Override
+	public void register(Metric metric, String metricName, AbstractMetricGroup group) {}
+
+	@Override
+	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {}
+
+	@Override
+	public ScopeFormats getScopeFormats() {
+		return scopeFormats;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index cea0928..31304e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -61,6 +61,9 @@ public class TaskManagerMetricsTest extends TestLogger {
 
 		HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
 
+		final MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
+			MetricRegistryConfiguration.fromConfiguration(new Configuration()));
+
 		try {
 			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
 
@@ -73,6 +76,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
+				new NoOpMetricRegistry(),
 				Option.empty(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
@@ -89,9 +93,9 @@ public class TaskManagerMetricsTest extends TestLogger {
 			TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config);
 
 			TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
-					taskManagerServicesConfiguration, tmResourceID);
-
-			final MetricRegistry tmRegistry = taskManagerServices.getMetricRegistry();
+				taskManagerServicesConfiguration,
+				tmResourceID,
+				metricRegistry);
 
 			// create the task manager
 			final Props tmProps = TaskManager.getTaskManagerProps(
@@ -103,7 +107,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 				taskManagerServices.getIOManager(),
 				taskManagerServices.getNetworkEnvironment(),
 				highAvailabilityServices,
-				tmRegistry);
+				taskManagerServices.getTaskManagerMetricGroup());
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);
 
@@ -135,7 +139,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 			}};
 
 			// verify that the registry was not shutdown due to the disconnect
-			Assert.assertFalse(tmRegistry.isShutdown());
+			Assert.assertFalse(metricRegistry.isShutdown());
 
 			// shut down the actors and the actor system
 			actorSystem.shutdown();
@@ -148,6 +152,8 @@ public class TaskManagerMetricsTest extends TestLogger {
 			if (highAvailabilityServices != null) {
 				highAvailabilityServices.closeAndCleanupAllData();
 			}
+
+			metricRegistry.shutdown();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 5c33ad6..55ba3a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
@@ -82,7 +82,7 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 		};
 
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 
 		MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 648ee47..8d91b81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.TestReporter;
 
@@ -45,7 +45,7 @@ public class AbstractMetricGroupTest {
 	 */
 	@Test
 	public void testGetAllVariables() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
 			@Override
@@ -90,7 +90,7 @@ public class AbstractMetricGroupTest {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
 
-		MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+		MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 		try {
 			MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
 			tmGroup.counter("1");
@@ -180,7 +180,7 @@ public class AbstractMetricGroupTest {
 	public void testScopeGenerationWithoutReporters() {
 		Configuration config = new Configuration();
 		config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
-		MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+		MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 		try {
 			TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id");

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 03341a6..05a72ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -45,7 +45,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void addAndRemoveJobs() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
 
 		final JobID jid1 = new JobID();
@@ -77,7 +77,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCloseClosesAll() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
 
 		final JobID jid1 = new JobID();
@@ -103,7 +103,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
 
 		assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
@@ -116,7 +116,7 @@ public class JobManagerGroupTest extends TestLogger {
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");
 
@@ -128,7 +128,7 @@ public class JobManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
 
 		QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter());

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index d734dfd..4373f80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.TestLogger;
@@ -39,7 +39,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
 		JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -60,7 +60,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 
@@ -83,7 +83,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 		Configuration cfg = new Configuration();
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
 		cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		JobID jid = new JobID();
 
@@ -104,7 +104,7 @@ public class JobManagerJobGroupTest extends TestLogger {
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
 		JobID jid = new JobID();
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
 		JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 56ce5fa..324bb73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -27,9 +27,10 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for the registration of groups and metrics on a {@link MetricGroup}.
  */
-public class MetricGroupRegistrationTest {
+public class MetricGroupRegistrationTest extends TestLogger {
 	/**
 	 * Verifies that group methods instantiate the correct metric with the given name.
 	 */
@@ -49,7 +50,7 @@ public class MetricGroupRegistrationTest {
 		config.setString(MetricOptions.REPORTERS_LIST, "test");
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
 
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 
@@ -111,7 +112,7 @@ public class MetricGroupRegistrationTest {
 	public void testDuplicateGroupName() {
 		Configuration config = new Configuration();
 
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 		MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 633dbed..94760e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -48,13 +49,13 @@ public class MetricGroupTest extends TestLogger {
 
 	private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
 
-	private MetricRegistry registry;
+	private MetricRegistryImpl registry;
 
-	private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
+	private final MetricRegistryImpl exceptionOnRegister = new ExceptionOnRegisterRegistry();
 
 	@Before
 	public void createRegistry() {
-		this.registry = new MetricRegistry(defaultMetricRegistryConfiguration);
+		this.registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration);
 	}
 
 	@After
@@ -134,7 +135,7 @@ public class MetricGroupTest extends TestLogger {
 		JobID jid = new JobID();
 		JobVertexID vid = new JobVertexID();
 		AbstractID eid = new AbstractID();
-		MetricRegistry registry = new MetricRegistry(defaultMetricRegistryConfiguration);
+		MetricRegistryImpl registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration);
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
@@ -156,7 +157,7 @@ public class MetricGroupTest extends TestLogger {
 
 	// ------------------------------------------------------------------------
 
-	private static class ExceptionOnRegisterRegistry extends MetricRegistry {
+	private static class ExceptionOnRegisterRegistry extends MetricRegistryImpl {
 
 		public ExceptionOnRegisterRegistry() {
 			super(defaultMetricRegistryConfiguration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 4363a9d..820b73e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
@@ -47,7 +47,7 @@ public class OperatorGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
 		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -70,7 +70,7 @@ public class OperatorGroupTest extends TestLogger {
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
 		cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>");
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 		try {
 			String tmID = "test-tm-id";
 			JobID jid = new JobID();
@@ -97,7 +97,7 @@ public class OperatorGroupTest extends TestLogger {
 
 	@Test
 	public void testIOMetricGroupInstantiation() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
 		TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -114,7 +114,7 @@ public class OperatorGroupTest extends TestLogger {
 
 	@Test
 	public void testVariables() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		JobID jid = new JobID();
 		JobVertexID tid = new JobVertexID();
@@ -156,7 +156,7 @@ public class OperatorGroupTest extends TestLogger {
 		JobVertexID vid = new JobVertexID();
 		AbstractID eid = new AbstractID();
 		OperatorID oid = new OperatorID();
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 		TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
 		TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index bd85303..3272f73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
@@ -50,7 +50,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void addAndRemoveJobs() throws IOException {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
 				registry, "localhost", new AbstractID().toString());
@@ -112,7 +112,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCloseClosesAll() throws IOException {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
 			registry, "localhost", new AbstractID().toString());
 
@@ -152,7 +152,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testGenerateScopeDefault() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id");
 
 		assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents());
@@ -164,7 +164,7 @@ public class TaskManagerGroupTest extends TestLogger {
 	public void testGenerateScopeCustom() {
 		Configuration cfg = new Configuration();
 		cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 		TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
 
 		assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
@@ -174,7 +174,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
 	@Test
 	public void testCreateQueryServiceMetricInfo() {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
 		TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
 
 		QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());