You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:45 UTC

[5/7] flink git commit: [FLINK-7876] Properly start and shutdown MetricRegistry by ClusterEntrypoint

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index e4ceb40..72c03af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.flink.runtime.leaderelection;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -42,25 +33,37 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.concurrent.TimeUnit;
-
 public class JobManagerLeaderElectionTest extends TestLogger {
 
 	@Rule
@@ -198,7 +201,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			submittedJobGraphStore,
 			checkpointRecoveryFactory,
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-			Option.<MetricRegistryImpl>empty(),
+			new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
 			Option.<String>empty());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
index 1140e3d..46d6548 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 
+import javax.annotation.Nullable;
+
 /**
  * Metric registry which does nothing and is intended for testing purposes.
  */
@@ -57,4 +59,10 @@ public class NoOpMetricRegistry implements MetricRegistry {
 	public ScopeFormats getScopeFormats() {
 		return scopeFormats;
 	}
+
+	@Nullable
+	@Override
+	public String getMetricQueryServicePath() {
+		return null;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 31304e5..d934ea9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
 import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
@@ -94,8 +96,12 @@ public class TaskManagerMetricsTest extends TestLogger {
 
 			TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
 				taskManagerServicesConfiguration,
-				tmResourceID,
-				metricRegistry);
+				tmResourceID);
+
+			TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+				metricRegistry,
+				taskManagerServices.getTaskManagerLocation(),
+				taskManagerServices.getNetworkEnvironment());
 
 			// create the task manager
 			final Props tmProps = TaskManager.getTaskManagerProps(
@@ -107,7 +113,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 				taskManagerServices.getIOManager(),
 				taskManagerServices.getNetworkEnvironment(),
 				highAvailabilityServices,
-				taskManagerServices.getTaskManagerMetricGroup());
+				taskManagerMetricGroup);
 
 			final ActorRef taskManager = actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 3d29815..7065e6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -22,22 +22,22 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.UUID;
 
 public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 	
-	private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+	private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry();
 
 	
 	public UnregisteredTaskMetricsGroup() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 7b10db6..faa97a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -72,8 +71,7 @@ public class MetricFetcherTest extends TestLogger {
 
 		// ========= setup TaskManager =================================================================================
 		JobID jobID = new JobID();
-		InstanceID tmID = new InstanceID();
-		ResourceID tmRID = new ResourceID(tmID.toString());
+		ResourceID tmRID = ResourceID.generate();
 
 		// ========= setup JobManager ==================================================================================
 		JobDetails details = mock(JobDetails.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 7c6b7dd..1f1d09d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -153,7 +153,6 @@ public class TaskExecutorITCase extends TestLogger {
 			networkEnvironment,
 			testingHAServices,
 			heartbeatServices,
-			metricRegistry,
 			taskManagerMetricGroup,
 			broadcastVariableManager,
 			fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index fcd6e4e..de807a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -45,8 +45,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -61,7 +61,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -114,7 +113,16 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.RETURNS_MOCKS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
 
@@ -205,7 +213,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			heartbeatServices,
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -311,7 +318,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			heartbeatServices,
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -429,7 +435,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			heartbeatServices,
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -522,7 +527,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -605,7 +609,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -747,7 +750,6 @@ public class TaskExecutorTest extends TestLogger {
 			networkEnvironment,
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			taskManagerMetricGroup,
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -863,7 +865,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -981,7 +982,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -1074,7 +1074,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -1248,7 +1247,6 @@ public class TaskExecutorTest extends TestLogger {
 			networkMock,
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-			mock(MetricRegistryImpl.class),
 			taskManagerMetricGroup,
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -1371,7 +1369,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServicesMock,
 			heartbeatServicesMock,
-			mock(MetricRegistryImpl.class),
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 8249fca..4b62770 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import static org.junit.Assert.*;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigConstants;
@@ -49,21 +41,28 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-
 import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
 import org.junit.Test;
 
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertTrue;
 
 public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 
@@ -169,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				network,
 				numberOfSlots,
 				highAvailabilityServices,
-				new MetricRegistryImpl(metricRegistryConfiguration));
+				new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
 
 			taskManager = actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 2b91cd4..0369771 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.metrics.MetricRegistryImpl
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 75f0aa4..16e238b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -80,6 +80,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 95ba154..3bdc2ac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -465,7 +465,11 @@ public class YarnApplicationMasterRunner {
 		}
 
 		if (metricRegistry != null) {
-			metricRegistry.shutdown();
+			try {
+				metricRegistry.shutdown();
+			} catch (Throwable t) {
+				LOG.error("Could not properly shut down the metric registry.", t);
+			}
 		}
 
 		org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6feb287..b32d25c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -116,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
 			SlotManager slotManager,
-			MetricRegistryImpl metricRegistry,
+			MetricRegistry metricRegistry,
 			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 439fdf3..e1efb54 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -78,7 +78,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			MetricRegistryImpl metricRegistry,
+			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index a13f62c..042644b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -68,7 +68,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
 			HeartbeatServices heartbeatServices,
-			MetricRegistryImpl metricRegistry,
+			MetricRegistry metricRegistry,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);