You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:44 UTC

[4/7] flink git commit: [FLINK-7876] Register TaskManagerMetricGroup under ResourceID

[FLINK-7876] Register TaskManagerMetricGroup under ResourceID

This commit changes that TaskManagerMetricGroups are now registered under the
TaskManager's ResourceID instead of the InstanceID. This allows to create the
TaskManagerMetricGroup at startup of the TaskManager.

Moreover, it pulls the MetricRegistry out of JobManager and TaskManager. This
allows to reuse the same MetricRegistry across multiple instances (e.g. in the
FlinkMiniCluster case). Moreover, it ensures proper cleanup of a potentially
started MetricyQueryServiceActor.

Change TaskManagersHandler to work with ResourceID instead of InstanceID

Adapt MetricFetcher to use ResourceID instead of InstanceID

This closes #4872.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45b9412
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45b9412
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45b9412

Branch: refs/heads/master
Commit: d45b941280155a67aed3f3518b2a05eedc1dab2e
Parents: a7e0a27
Author: Till <ti...@gmail.com>
Authored: Thu Oct 19 17:22:53 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Nov 1 15:52:00 2017 +0100

----------------------------------------------------------------------
 .../entrypoint/MesosJobClusterEntrypoint.java   |   6 +-
 .../MesosSessionClusterEntrypoint.java          |   6 +-
 .../MesosApplicationMasterRunner.java           |  13 +
 .../clusterframework/MesosResourceManager.java  |   4 +-
 .../clusterframework/MesosJobManager.scala      |   7 +-
 .../clusterframework/MesosTaskManager.scala     |   6 +-
 .../MesosResourceManagerTest.java               |   8 +-
 .../ScheduledDropwizardReporterTest.java        |   4 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |   6 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  14 +-
 .../PrometheusReporterTaskScopeTest.java        |   6 +-
 .../prometheus/PrometheusReporterTest.java      |  16 +-
 .../flink/metrics/slf4j/Slf4jReporterTest.java  |   6 +-
 .../metrics/statsd/StatsDReporterTest.java      |  12 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   2 +
 .../runtime/akka/AkkaJobManagerGateway.java     |  10 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  11 +-
 .../dispatcher/StandaloneDispatcher.java        |   6 +-
 .../runtime/entrypoint/ClusterEntrypoint.java   |  10 +-
 .../entrypoint/JobClusterEntrypoint.java        |   8 +-
 .../entrypoint/SessionClusterEntrypoint.java    |   8 +-
 .../StandaloneSessionClusterEntrypoint.java     |   4 +-
 .../runtime/jobmaster/JobManagerGateway.java    |   6 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   4 +-
 .../flink/runtime/metrics/MetricRegistry.java   | 378 +-------------
 .../metrics/MetricRegistryConfiguration.java    |   2 +-
 .../runtime/metrics/MetricRegistryImpl.java     | 412 +++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     |   2 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  12 +-
 .../minicluster/MiniClusterJobDispatcher.java   |   8 +-
 .../minicluster/StandaloneMiniCluster.java      |  29 +-
 .../resourcemanager/ResourceManager.java        |  12 +-
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../resourcemanager/ResourceManagerRunner.java  |   4 +-
 .../StandaloneResourceManager.java              |   4 +-
 .../handler/legacy/TaskManagerLogHandler.java   |   8 +-
 .../handler/legacy/TaskManagersHandler.java     |  21 +-
 .../handler/legacy/metrics/MetricFetcher.java   |  10 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   6 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |  24 +-
 .../taskexecutor/TaskManagerServices.java       |  18 +-
 .../TaskManagerServicesConfiguration.java       |  13 -
 .../runtime/webmonitor/RestfulGateway.java      |   4 +-
 .../ContaineredJobManager.scala                 |   7 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 113 ++---
 .../runtime/messages/JobManagerMessages.scala   |   7 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   8 +
 .../minicluster/LocalFlinkMiniCluster.scala     |  52 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  48 +-
 .../clusterframework/ResourceManagerTest.java   |   6 +-
 .../runtime/dispatcher/DispatcherTest.java      |  10 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  13 +-
 .../runtime/jobmanager/JobManagerTest.java      |  11 +
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +
 .../jobmaster/JobManagerRunnerMockTest.java     |   4 +-
 .../JobManagerLeaderElectionTest.java           |   5 +-
 .../runtime/metrics/MetricRegistryImplTest.java | 496 +++++++++++++++++++
 .../runtime/metrics/MetricRegistryTest.java     | 496 -------------------
 .../runtime/metrics/NoOpMetricRegistry.java     |  60 +++
 .../runtime/metrics/TaskManagerMetricsTest.java |  16 +-
 .../metrics/dump/MetricQueryServiceTest.java    |   4 +-
 .../metrics/groups/AbstractMetricGroupTest.java |   8 +-
 .../metrics/groups/JobManagerGroupTest.java     |  12 +-
 .../metrics/groups/JobManagerJobGroupTest.java  |  10 +-
 .../groups/MetricGroupRegistrationTest.java     |   9 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  11 +-
 .../metrics/groups/OperatorGroupTest.java       |  12 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  12 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |  10 +-
 .../metrics/groups/TaskMetricGroupTest.java     |  14 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |   4 +-
 .../resourcemanager/ResourceManagerHATest.java  |   4 +-
 .../ResourceManagerJobMasterTest.java           |   4 +-
 .../ResourceManagerTaskExecutorTest.java        |   4 +-
 .../legacy/TaskManagerLogHandlerTest.java       |   2 +-
 .../legacy/metrics/MetricFetcherTest.java       |  10 +-
 .../taskexecutor/TaskExecutorITCase.java        |   4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  24 +-
 .../taskexecutor/TaskManagerServicesTest.java   |   3 +-
 ...askManagerComponentsStartupShutdownTest.java |   7 +-
 .../TaskManagerProcessReapingTestBase.java      |  20 +-
 .../taskmanager/TaskManagerStartupTest.java     |   2 +
 .../jobmanager/JobManagerRegistrationTest.scala |  17 +-
 .../runtime/testingUtils/TestingCluster.scala   |   7 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../testingUtils/TestingTaskManager.scala       |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |  21 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   2 +
 .../JobManagerHACheckpointRecoveryITCase.java   |   2 +
 .../JobManagerHAJobGraphRecoveryITCase.java     |   2 +
 ...agerHAProcessFailureBatchRecoveryITCase.java |   2 +
 .../recovery/ProcessFailureCancelingITCase.java |   2 +
 .../AbstractOperatorRestoreTestBase.java        |   3 +
 .../flink/yarn/TestingYarnJobManager.scala      |   6 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |   6 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  13 +
 .../apache/flink/yarn/YarnResourceManager.java  |   4 +-
 .../entrypoint/YarnJobClusterEntrypoint.java    |   4 +-
 .../YarnSessionClusterEntrypoint.java           |   4 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   7 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   6 +-
 101 files changed, 1503 insertions(+), 1317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 9a3639d..b98adff 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -112,7 +112,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 	}
 
 	@Override
-	protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
+	protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl metricRegistry) throws Exception {
 		super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
 	}
 
@@ -123,7 +123,7 @@ public class MesosJobClusterEntrypoint extends JobClusterEntrypoint {
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		FatalErrorHandler fatalErrorHandler) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index aa511b0..0cf0fce 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -102,7 +102,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 	}
 
 	@Override
-	protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
+	protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl metricRegistry) throws Exception {
 		super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
 	}
 
@@ -113,7 +113,7 @@ public class MesosSessionClusterEntrypoint extends SessionClusterEntrypoint {
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		FatalErrorHandler fatalErrorHandler) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index a6ea133..9887d97 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -39,6 +39,8 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -200,6 +202,7 @@ public class MesosApplicationMasterRunner {
 		ExecutorService ioExecutor = null;
 		MesosServices mesosServices = null;
 		HighAvailabilityServices highAvailabilityServices = null;
+		MetricRegistryImpl metricRegistry = null;
 
 		try {
 			// ------- (1) load and parse / validate all configurations -------
@@ -304,6 +307,11 @@ public class MesosApplicationMasterRunner {
 			// 2: the JobManager
 			LOG.debug("Starting JobManager actor");
 
+			metricRegistry = new MetricRegistryImpl(
+				MetricRegistryConfiguration.fromConfiguration(config));
+
+			metricRegistry.startQueryService(actorSystem, null);
+
 			// we start the JobManager with its standard name
 			ActorRef jobManager = JobManager.startJobManagerActors(
 				config,
@@ -311,6 +319,7 @@ public class MesosApplicationMasterRunner {
 				futureExecutor,
 				ioExecutor,
 				highAvailabilityServices,
+				metricRegistry,
 				webMonitor != null ? Option.apply(webMonitor.getRestAddress()) : Option.empty(),
 				Option.apply(JobMaster.JOB_MANAGER_NAME),
 				Option.apply(JobMaster.ARCHIVE_NAME),
@@ -422,6 +431,10 @@ public class MesosApplicationMasterRunner {
 			}
 		}
 
+		if (metricRegistry != null) {
+			metricRegistry.shutdown();
+		}
+
 		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
 			AkkaUtils.getTimeout(config).toMillis(),
 			TimeUnit.MILLISECONDS,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 1e32b2c..7ea4908 100644
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -145,7 +145,7 @@ public class MesosResourceManager extends ResourceManager<RegisteredMesosWorkerN
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			SlotManager slotManager,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler,
 			// Mesos specifics

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 9ad8eb2..c6230e7 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 
@@ -66,7 +67,7 @@ class MesosJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends ContaineredJobManager(
     flinkConfiguration,
@@ -83,7 +84,7 @@ class MesosJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry,
+    jobManagerMetricGroup,
     optRestAddress) {
 
   val jobPollingInterval: FiniteDuration = 5 seconds

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 7834639..e69472e 100644
--- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -39,7 +39,7 @@ class MesosTaskManager(
     network: NetworkEnvironment,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
-    metricRegistry : MetricRegistry)
+    taskManagerMetricGroup : TaskManagerMetricGroup)
   extends TaskManager(
     config,
     resourceID,
@@ -49,7 +49,7 @@ class MesosTaskManager(
     network,
     numberOfSlots,
     highAvailabilityServices,
-    metricRegistry) {
+    taskManagerMetricGroup) {
 
   override def handleMessage: Receive = {
     super.handleMessage

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 4bdd9a3..1cdd087 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -160,7 +160,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			SlotManager slotManager,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler,
 
@@ -306,7 +306,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public final ScheduledExecutor scheduledExecutor;
 			public final TestingHighAvailabilityServices highAvailabilityServices;
 			public final HeartbeatServices heartbeatServices;
-			public final MetricRegistry metricRegistry;
+			public final MetricRegistryImpl metricRegistry;
 			public final TestingLeaderElectionService rmLeaderElectionService;
 			public final JobLeaderIdService jobLeaderIdService;
 			public final SlotManager slotManager;
@@ -321,7 +321,7 @@ public class MesosResourceManagerTest extends TestLogger {
 				rmLeaderElectionService = new TestingLeaderElectionService();
 				highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 				heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
-				metricRegistry = mock(MetricRegistry.class);
+				metricRegistry = mock(MetricRegistryImpl.class);
 				slotManager = mock(SlotManager.class);
 				slotManagerStarted = new CompletableFuture<>();
 				jobLeaderIdService = new JobLeaderIdService(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 5ed6de2..e6d5e27 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -92,7 +92,7 @@ public class ScheduledDropwizardReporterTest {
 
 		MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);
 
-		MetricRegistry metricRegistry = new MetricRegistry(metricRegistryConfiguration);
+		MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);
 
 		char delimiter = metricRegistry.getDelimiter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 63765ae..a927a30 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
@@ -105,12 +105,12 @@ public class DropwizardFlinkHistogramWrapperTest extends TestLogger {
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
 		config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");
 
-		MetricRegistry registry = null;
+		MetricRegistryImpl registry = null;
 
 		MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
 
 		try {
-			registry = new MetricRegistry(metricRegistryConfiguration);
+			registry = new MetricRegistryImpl(metricRegistryConfiguration);
 			DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
 				new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index f10769a..4c97055 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -108,7 +108,7 @@ public class JMXReporterTest extends TestLogger {
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");
 
-		MetricRegistry reg = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
 
@@ -168,7 +168,7 @@ public class JMXReporterTest extends TestLogger {
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 		cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");
 
-		MetricRegistry reg = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+		MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
 		TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");
 
@@ -231,7 +231,7 @@ public class JMXReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testHistogramReporting() throws Exception {
-		MetricRegistry registry = null;
+		MetricRegistryImpl registry = null;
 		String histogramName = "histogram";
 
 		try {
@@ -239,7 +239,7 @@ public class JMXReporterTest extends TestLogger {
 			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+			registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -281,7 +281,7 @@ public class JMXReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testMeterReporting() throws Exception {
-		MetricRegistry registry = null;
+		MetricRegistryImpl registry = null;
 		String meterName = "meter";
 
 		try {
@@ -289,7 +289,7 @@ public class JMXReporterTest extends TestLogger {
 			config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+			registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
index 0ae8fc7..55ddc00 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -75,12 +75,12 @@ public class PrometheusReporterTaskScopeTest {
 	private TaskMetricGroup taskMetricGroup1;
 	private TaskMetricGroup taskMetricGroup2;
 
-	private MetricRegistry registry;
+	private MetricRegistryImpl registry;
 	private PrometheusReporter reporter;
 
 	@Before
 	public void setupReporter() {
-		registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
+		registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
 		reporter = (PrometheusReporter) registry.getReporters().get(0);
 
 		TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 0d7be6d..6704189 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestMeter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
@@ -69,13 +69,13 @@ public class PrometheusReporterTest extends TestLogger {
 	@Rule
 	public ExpectedException thrown = ExpectedException.none();
 
-	private MetricRegistry registry;
+	private MetricRegistryImpl registry;
 	private FrontMetricGroup<TaskManagerMetricGroup> metricGroup;
 	private PrometheusReporter reporter;
 
 	@Before
 	public void setupReporter() {
-		registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
+		registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
 		metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER));
 		reporter = (PrometheusReporter) registry.getReporters().get(0);
 	}
@@ -158,7 +158,7 @@ public class PrometheusReporterTest extends TestLogger {
 
 	@Test
 	public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException {
-		MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
+		MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
 		PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0);
 		reporter.close();
 		thrown.expect(UnirestException.class);
@@ -244,12 +244,12 @@ public class PrometheusReporterTest extends TestLogger {
 
 	@Test
 	public void cannotStartTwoReportersOnSamePort() {
-		final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
+		final MetricRegistryImpl fixedPort1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
 		assertThat(fixedPort1.getReporters(), hasSize(1));
 
 		PrometheusReporter firstReporter = (PrometheusReporter) fixedPort1.getReporters().get(0);
 
-		final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort()))));
+		final MetricRegistryImpl fixedPort2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort()))));
 		assertThat(fixedPort2.getReporters(), hasSize(0));
 
 		fixedPort1.shutdown();
@@ -258,8 +258,8 @@ public class PrometheusReporterTest extends TestLogger {
 
 	@Test
 	public void canStartTwoReportersWhenUsingPortRange() {
-		final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9200-9300")));
-		final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9200-9300")));
+		final MetricRegistryImpl portRange1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9200-9300")));
+		final MetricRegistryImpl portRange2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9200-9300")));
 
 		assertThat(portRange1.getReporters(), hasSize(1));
 		assertThat(portRange2.getReporters(), hasSize(1));

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index 51724bd..ba7c5a1 100644
--- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -29,8 +29,8 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
@@ -53,7 +53,7 @@ public class Slf4jReporterTest extends TestLogger {
 	private static final String TASK_MANAGER_ID = "tm01";
 	private static final String JOB_NAME = "jn01";
 	private static final String TASK_NAME = "tn01";
-	private static MetricRegistry registry;
+	private static MetricRegistryImpl registry;
 	private static char delimiter;
 	private static TaskMetricGroup taskMetricGroup;
 	private static Slf4jReporter reporter;
@@ -68,7 +68,7 @@ public class Slf4jReporterTest extends TestLogger {
 			ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, Slf4jReporter.class.getName());
 		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
 
-		registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+		registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 		delimiter = registry.getDelimiter();
 
 		taskMetricGroup = new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER_ID)

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 94de9a9..f460abd 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -89,7 +89,7 @@ public class StatsDReporterTest extends TestLogger {
 		configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
 		configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
-		MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+		MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
 		char delimiter = metricRegistry.getDelimiter();
 
@@ -133,7 +133,7 @@ public class StatsDReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testStatsDHistogramReporting() throws Exception {
-		MetricRegistry registry = null;
+		MetricRegistryImpl registry = null;
 		DatagramSocketReceiver receiver = null;
 		Thread receiverThread = null;
 		long timeout = 5000;
@@ -157,7 +157,7 @@ public class StatsDReporterTest extends TestLogger {
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port);
 
-			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+			registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -207,7 +207,7 @@ public class StatsDReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testStatsDMetersReporting() throws Exception {
-		MetricRegistry registry = null;
+		MetricRegistryImpl registry = null;
 		DatagramSocketReceiver receiver = null;
 		Thread receiverThread = null;
 		long timeout = 5000;
@@ -231,7 +231,7 @@ public class StatsDReporterTest extends TestLogger {
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", "localhost");
 			config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + port);
 
-			registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+			registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
 			TestMeter meter = new TestMeter();
 			metricGroup.meter(meterName, meter);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index f9dd98e..75a844c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.rest.handler.util.MimeTypes;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -193,6 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
 					highAvailabilityServices,
+					new NoOpMetricRegistry(),
 					Option.apply(webMonitor[i].getRestAddress()),
 					JobManager.class,
 					MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 6896852..08946ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.akka;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -177,10 +177,10 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	//--------------------------------------------------------------------------------
 
 	@Override
-	public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout) {
+	public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout) {
 		return FutureUtils.toJava(
 			jobManagerGateway
-				.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceId), FutureUtils.toFiniteDuration(timeout))
+				.ask(new JobManagerMessages.RequestTaskManagerInstance(resourceId), FutureUtils.toFiniteDuration(timeout))
 				.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
 			.thenApply(
 				(JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
@@ -265,7 +265,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
 		return requestTaskManagerInstances(timeout)
 			.thenApply(
 				(Collection<Instance> instances) ->
@@ -277,7 +277,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 								final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
 									MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();
 
-								return Tuple2.of(instance.getId(), taskManagerMetricQuerServicePath);
+								return Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
 							})
 						.collect(Collectors.toList()));
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index dda0275..c2f8539 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -46,7 +45,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -89,7 +88,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private final ResourceManagerGateway resourceManagerGateway;
 	private final JobManagerServices jobManagerServices;
 	private final HeartbeatServices heartbeatServices;
-	private final MetricRegistry metricRegistry;
+	private final MetricRegistryImpl metricRegistry;
 
 	private final FatalErrorHandler fatalErrorHandler;
 
@@ -107,7 +106,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 			ResourceManagerGateway resourceManagerGateway,
 			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
 			Optional<String> restAddress) throws Exception {
 		super(rpcService, endpointId);
@@ -384,7 +383,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	@Override
-	public CompletableFuture<Collection<Tuple2<InstanceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
+	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
 		return resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
 	}
 
@@ -480,7 +479,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
 		JobManagerServices jobManagerServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		OnCompletionActions onCompleteActions,
 		FatalErrorHandler fatalErrorHandler) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5a6889e..ee92663 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -49,7 +49,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			ResourceManagerGateway resourceManagerGateway,
 			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			FatalErrorHandler fatalErrorHandler,
 			Optional<String> restAddress) throws Exception {
 		super(
@@ -74,7 +74,7 @@ public class StandaloneDispatcher extends Dispatcher {
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			JobManagerServices jobManagerServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			OnCompletionActions onCompleteActions,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		// create the standard job manager runner

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index b2ddf1d..1a0e2ae 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -76,7 +76,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 	private final CompletableFuture<Boolean> terminationFuture;
 
 	@GuardedBy("lock")
-	private MetricRegistry metricRegistry = null;
+	private MetricRegistryImpl metricRegistry = null;
 
 	@GuardedBy("lock")
 	private HighAvailabilityServices haServices = null;
@@ -204,8 +204,8 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		return HeartbeatServices.fromConfiguration(configuration);
 	}
 
-	protected MetricRegistry createMetricRegistry(Configuration configuration) {
-		return new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+	protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
+		return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 	}
 
 	protected void shutDown(boolean cleanupHaData) throws FlinkException {
@@ -278,7 +278,7 @@ public abstract class ClusterEntrypoint implements FatalErrorHandler {
 		HighAvailabilityServices highAvailabilityServices,
 		BlobServer blobServer,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry) throws Exception;
+		MetricRegistryImpl metricRegistry) throws Exception;
 
 	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 124c6c6..50d29da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -59,7 +59,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			HighAvailabilityServices highAvailabilityServices,
 			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
+			MetricRegistryImpl metricRegistry) throws Exception {
 
 		resourceManager = createResourceManager(
 			configuration,
@@ -96,7 +96,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 			HighAvailabilityServices highAvailabilityServices,
 			JobManagerServices jobManagerServices,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 
 		JobGraph jobGraph = retrieveJobGraph(configuration);
@@ -163,7 +163,7 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		FatalErrorHandler fatalErrorHandler) throws Exception;
 
 	protected abstract JobGraph retrieveJobGraph(Configuration configuration) throws FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index e24e01a..8a48864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -69,7 +69,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			HighAvailabilityServices highAvailabilityServices,
 			BlobServer blobServer,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry) throws Exception {
+			MetricRegistryImpl metricRegistry) throws Exception {
 
 		dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
 
@@ -173,7 +173,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		ResourceManagerGateway resourceManagerGateway,
 		BlobServer blobServer,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		FatalErrorHandler fatalErrorHandler,
 		Optional<String> restAddress) throws Exception {
 
@@ -197,6 +197,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 		RpcService rpcService,
 		HighAvailabilityServices highAvailabilityServices,
 		HeartbeatServices heartbeatServices,
-		MetricRegistry metricRegistry,
+		MetricRegistryImpl metricRegistry,
 		FatalErrorHandler fatalErrorHandler) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index e7c9816..7d4373d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -51,7 +51,7 @@ public class StandaloneSessionClusterEntrypoint extends SessionClusterEntrypoint
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			MetricRegistry metricRegistry,
+			MetricRegistryImpl metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 782d6d0..2527e46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -107,11 +107,11 @@ public interface JobManagerGateway extends RestfulGateway {
 	 * Requests the TaskManager instance registered under the given instanceId from the JobManager.
 	 * If there is no Instance registered, then {@link Optional#empty()} is returned.
 	 *
-	 * @param instanceId for which to retrieve the Instance
+	 * @param resourceId identifying the TaskManager which shall be retrieved
 	 * @param timeout for the asynchronous operation
 	 * @return Future containing the TaskManager instance registered under instanceId, otherwise {@link Optional#empty()}
 	 */
-	CompletableFuture<Optional<Instance>> requestTaskManagerInstance(InstanceID instanceId, Time timeout);
+	CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout);
 
 	/**
 	 * Requests all currently registered TaskManager instances from the JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 14baa6f..0a85bbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -111,7 +111,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			final HighAvailabilityServices haServices,
 			final HeartbeatServices heartbeatServices,
 			final JobManagerServices jobManagerServices,
-			final MetricRegistry metricRegistry,
+			final MetricRegistryImpl metricRegistry,
 			final OnCompletionActions toNotifyOnComplete,
 			final FatalErrorHandler errorHandler) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 278292d..9aa97cb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,283 +18,34 @@
 
 package org.apache.flink.runtime.metrics;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.View;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
- * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
+ * Interface for a metric registry.
  */
-public class MetricRegistry {
-	static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-
-	private final Object lock = new Object();
-
-	private List<MetricReporter> reporters;
-	private ScheduledExecutorService executor;
-
-	@Nullable
-	private ActorRef queryService;
-
-	@Nullable
-	private String metricQueryServicePath;
-
-	private ViewUpdater viewUpdater;
-
-	private final ScopeFormats scopeFormats;
-	private final char globalDelimiter;
-	private final List<Character> delimiters = new ArrayList<>();
-
-	/**
-	 * Creates a new MetricRegistry and starts the configured reporter.
-	 */
-	public MetricRegistry(MetricRegistryConfiguration config) {
-		this.scopeFormats = config.getScopeFormats();
-		this.globalDelimiter = config.getDelimiter();
-
-		// second, instantiate any custom configured reporters
-		this.reporters = new ArrayList<>();
-
-		List<Tuple2<String, Configuration>> reporterConfigurations = config.getReporterConfigurations();
-
-		this.executor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory("Flink-MetricRegistry"));
-
-		this.queryService = null;
-		this.metricQueryServicePath = null;
-
-		if (reporterConfigurations.isEmpty()) {
-			// no reporters defined
-			// by default, don't report anything
-			LOG.info("No metrics reporter configured, no metrics will be exposed/reported.");
-		} else {
-			// we have some reporters so
-			for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
-				String namedReporter = reporterConfiguration.f0;
-				Configuration reporterConfig = reporterConfiguration.f1;
-
-				final String className = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-				if (className == null) {
-					LOG.error("No reporter class set for reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-					continue;
-				}
-
-				try {
-					String configuredPeriod = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
-					TimeUnit timeunit = TimeUnit.SECONDS;
-					long period = 10;
-
-					if (configuredPeriod != null) {
-						try {
-							String[] interval = configuredPeriod.split(" ");
-							period = Long.parseLong(interval[0]);
-							timeunit = TimeUnit.valueOf(interval[1]);
-						}
-						catch (Exception e) {
-							LOG.error("Cannot parse report interval from config: " + configuredPeriod +
-									" - please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
-									"Using default reporting interval.");
-						}
-					}
-
-					Class<?> reporterClass = Class.forName(className);
-					MetricReporter reporterInstance = (MetricReporter) reporterClass.newInstance();
-
-					MetricConfig metricConfig = new MetricConfig();
-					reporterConfig.addAllToProperties(metricConfig);
-					LOG.info("Configuring {} with {}.", reporterClass.getSimpleName(), metricConfig);
-					reporterInstance.open(metricConfig);
-
-					if (reporterInstance instanceof Scheduled) {
-						LOG.info("Periodically reporting metrics in intervals of {} {} for reporter {} of type {}.", period, timeunit.name(), namedReporter, className);
-
-						executor.scheduleWithFixedDelay(
-								new MetricRegistry.ReporterTask((Scheduled) reporterInstance), period, period, timeunit);
-					} else {
-						LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className);
-					}
-					reporters.add(reporterInstance);
-
-					String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter));
-					if (delimiterForReporter.length() != 1) {
-						LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter);
-						delimiterForReporter = String.valueOf(globalDelimiter);
-					}
-					this.delimiters.add(delimiterForReporter.charAt(0));
-				}
-				catch (Throwable t) {
-					LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Initializes the MetricQueryService.
-	 *
-	 * @param actorSystem ActorSystem to create the MetricQueryService on
-	 * @param resourceID resource ID used to disambiguate the actor name
-     */
-	public void startQueryService(ActorSystem actorSystem, ResourceID resourceID) {
-		synchronized (lock) {
-			Preconditions.checkState(!isShutdown(), "The metric registry has already been shut down.");
-
-			try {
-				queryService = MetricQueryService.startMetricQueryService(actorSystem, resourceID);
-				metricQueryServicePath = AkkaUtils.getAkkaURL(actorSystem, queryService);
-			} catch (Exception e) {
-				LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e);
-			}
-		}
-	}
-
-	/**
-	 * Returns the address under which the {@link MetricQueryService} is reachable.
-	 *
-	 * @return address of the metric query service
-	 */
-	@Nullable
-	public String getMetricQueryServicePath() {
-		return metricQueryServicePath;
-	}
+public interface MetricRegistry {
 
 	/**
 	 * Returns the global delimiter.
 	 *
 	 * @return global delimiter
 	 */
-	public char getDelimiter() {
-		return this.globalDelimiter;
-	}
+	char getDelimiter();
 
 	/**
 	 * Returns the configured delimiter for the reporter with the given index.
 	 *
-	 * @param reporterIndex index of the reporter whose delimiter should be used
+	 * @param index index of the reporter whose delimiter should be used
 	 * @return configured reporter delimiter, or global delimiter if index is invalid
 	 */
-	public char getDelimiter(int reporterIndex) {
-		try {
-			return delimiters.get(reporterIndex);
-		} catch (IndexOutOfBoundsException e) {
-			LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", reporterIndex);
-			return this.globalDelimiter;
-		}
-	}
-
-	public List<MetricReporter> getReporters() {
-		return reporters;
-	}
-
-	/**
-	 * Returns whether this registry has been shutdown.
-	 *
-	 * @return true, if this registry was shutdown, otherwise false
-	 */
-	public boolean isShutdown() {
-		synchronized (lock) {
-			return reporters == null && executor.isShutdown();
-		}
-	}
+	char getDelimiter(int index);
 
 	/**
-	 * Shuts down this registry and the associated {@link MetricReporter}.
+	 * Returns the number of registered reporters.
 	 */
-	public void shutdown() {
-		synchronized (lock) {
-			Future<Boolean> stopFuture = null;
-			FiniteDuration stopTimeout = null;
-
-			if (queryService != null) {
-				stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
-				stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
-			}
-
-			if (reporters != null) {
-				for (MetricReporter reporter : reporters) {
-					try {
-						reporter.close();
-					} catch (Throwable t) {
-						LOG.warn("Metrics reporter did not shut down cleanly", t);
-					}
-				}
-				reporters = null;
-			}
-			shutdownExecutor();
-
-			if (stopFuture != null) {
-				boolean stopped = false;
-
-				try {
-					stopped = Await.result(stopFuture, stopTimeout);
-				} catch (Exception e) {
-					LOG.warn("Query actor did not properly stop.", e);
-				}
-
-				if (!stopped) {
-					// the query actor did not stop in time, let's kill him
-					queryService.tell(Kill.getInstance(), ActorRef.noSender());
-				}
-			}
-		}
-	}
-
-	private void shutdownExecutor() {
-		if (executor != null) {
-			executor.shutdown();
-
-			try {
-				if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) {
-					executor.shutdownNow();
-				}
-			} catch (InterruptedException e) {
-				executor.shutdownNow();
-			}
-		}
-	}
-
-	public ScopeFormats getScopeFormats() {
-		return scopeFormats;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Metrics (de)registration
-	// ------------------------------------------------------------------------
+	int getNumberReporters();
 
 	/**
 	 * Registers a new {@link Metric} with this registry.
@@ -303,44 +54,7 @@ public class MetricRegistry {
 	 * @param metricName  the name of the metric
 	 * @param group       the group that contains the metric
 	 */
-	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
-		synchronized (lock) {
-			if (isShutdown()) {
-				LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
-			} else {
-				if (reporters != null) {
-					for (int i = 0; i < reporters.size(); i++) {
-						MetricReporter reporter = reporters.get(i);
-						try {
-							if (reporter != null) {
-								FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
-								reporter.notifyOfAddedMetric(metric, metricName, front);
-							}
-						} catch (Exception e) {
-							LOG.warn("Error while registering metric.", e);
-						}
-					}
-				}
-				try {
-					if (queryService != null) {
-						MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while registering metric.", e);
-				}
-				try {
-					if (metric instanceof View) {
-						if (viewUpdater == null) {
-							viewUpdater = new ViewUpdater(executor);
-						}
-						viewUpdater.notifyOfAddedView((View) metric);
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while registering metric.", e);
-				}
-			}
-		}
-	}
+	void register(Metric metric, String metricName, AbstractMetricGroup group);
 
 	/**
 	 * Un-registers the given {@link Metric} with this registry.
@@ -349,79 +63,7 @@ public class MetricRegistry {
 	 * @param metricName  the name of the metric
 	 * @param group       the group that contains the metric
 	 */
-	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
-		synchronized (lock) {
-			if (isShutdown()) {
-				LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
-			} else {
-				if (reporters != null) {
-					for (int i = 0; i < reporters.size(); i++) {
-						try {
-						MetricReporter reporter = reporters.get(i);
-							if (reporter != null) {
-								FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
-								reporter.notifyOfRemovedMetric(metric, metricName, front);
-							}
-						} catch (Exception e) {
-							LOG.warn("Error while registering metric.", e);
-						}
-					}
-				}
-				try {
-					if (queryService != null) {
-						MetricQueryService.notifyOfRemovedMetric(queryService, metric);
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while registering metric.", e);
-				}
-				try {
-					if (metric instanceof View) {
-						if (viewUpdater != null) {
-							viewUpdater.notifyOfRemovedView((View) metric);
-						}
-					}
-				} catch (Exception e) {
-					LOG.warn("Error while registering metric.", e);
-				}
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	@VisibleForTesting
-	@Nullable
-	public ActorRef getQueryService() {
-		return queryService;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * This task is explicitly a static class, so that it does not hold any references to the enclosing
-	 * MetricsRegistry instance.
-	 *
-	 * <p>This is a subtle difference, but very important: With this static class, the enclosing class instance
-	 * may become garbage-collectible, whereas with an anonymous inner class, the timer thread
-	 * (which is a GC root) will hold a reference via the timer task and its enclosing instance pointer.
-	 * Making the MetricsRegistry garbage collectible makes the java.util.Timer garbage collectible,
-	 * which acts as a fail-safe to stop the timer thread and prevents resource leaks.
-	 */
-	private static final class ReporterTask extends TimerTask {
-
-		private final Scheduled reporter;
-
-		private ReporterTask(Scheduled reporter) {
-			this.reporter = reporter;
-		}
+	void unregister(Metric metric, String metricName, AbstractMetricGroup group);
 
-		@Override
-		public void run() {
-			try {
-				reporter.report();
-			} catch (Throwable t) {
-				LOG.warn("Error while reporting metrics", t);
-			}
-		}
-	}
+	ScopeFormats getScopeFormats();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index e72a980..d07cb65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -35,7 +35,7 @@ import java.util.List;
 import java.util.regex.Pattern;
 
 /**
- * Configuration object for {@link MetricRegistry}.
+ * Configuration object for {@link MetricRegistryImpl}.
  */
 public class MetricRegistryConfiguration {