You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:45 UTC
[5/7] flink git commit: [FLINK-7876] Properly start and shutdown
MetricRegistry by ClusterEntrypoint
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index e4ceb40..72c03af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -18,15 +18,6 @@
package org.apache.flink.runtime.leaderelection;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
@@ -42,25 +33,37 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
+import java.util.concurrent.TimeUnit;
+
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import java.util.concurrent.TimeUnit;
-
public class JobManagerLeaderElectionTest extends TestLogger {
@Rule
@@ -198,7 +201,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
submittedJobGraphStore,
checkpointRecoveryFactory,
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
- Option.<MetricRegistryImpl>empty(),
+ new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
Option.<String>empty());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
index 1140e3d..46d6548 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -23,6 +23,8 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import javax.annotation.Nullable;
+
/**
* Metric registry which does nothing and is intended for testing purposes.
*/
@@ -57,4 +59,10 @@ public class NoOpMetricRegistry implements MetricRegistry {
public ScopeFormats getScopeFormats() {
return scopeFormats;
}
+
+ @Nullable
+ @Override
+ public String getMetricQueryServicePath() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index 31304e5..d934ea9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
@@ -94,8 +96,12 @@ public class TaskManagerMetricsTest extends TestLogger {
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
- tmResourceID,
- metricRegistry);
+ tmResourceID);
+
+ TaskManagerMetricGroup taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+ metricRegistry,
+ taskManagerServices.getTaskManagerLocation(),
+ taskManagerServices.getNetworkEnvironment());
// create the task manager
final Props tmProps = TaskManager.getTaskManagerProps(
@@ -107,7 +113,7 @@ public class TaskManagerMetricsTest extends TestLogger {
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
highAvailabilityServices,
- taskManagerServices.getTaskManagerMetricGroup());
+ taskManagerMetricGroup);
final ActorRef taskManager = actorSystem.actorOf(tmProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index 3d29815..7065e6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -22,22 +22,22 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
import java.util.UUID;
public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
- private static final MetricRegistryImpl EMPTY_REGISTRY = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry();
public UnregisteredTaskMetricsGroup() {
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
index 7b10db6..faa97a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -72,8 +71,7 @@ public class MetricFetcherTest extends TestLogger {
// ========= setup TaskManager =================================================================================
JobID jobID = new JobID();
- InstanceID tmID = new InstanceID();
- ResourceID tmRID = new ResourceID(tmID.toString());
+ ResourceID tmRID = ResourceID.generate();
// ========= setup JobManager ==================================================================================
JobDetails details = mock(JobDetails.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 7c6b7dd..1f1d09d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -153,7 +153,6 @@ public class TaskExecutorITCase extends TestLogger {
networkEnvironment,
testingHAServices,
heartbeatServices,
- metricRegistry,
taskManagerMetricGroup,
broadcastVariableManager,
fileCache,
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index fcd6e4e..de807a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -45,8 +45,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
@@ -61,7 +61,6 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -114,7 +113,16 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.RETURNS_MOCKS;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TaskExecutorTest extends TestLogger {
@@ -205,7 +213,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
heartbeatServices,
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -311,7 +318,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
heartbeatServices,
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -429,7 +435,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
heartbeatServices,
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -522,7 +527,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -605,7 +609,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -747,7 +750,6 @@ public class TaskExecutorTest extends TestLogger {
networkEnvironment,
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
taskManagerMetricGroup,
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -863,7 +865,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -981,7 +982,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -1074,7 +1074,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -1248,7 +1247,6 @@ public class TaskExecutorTest extends TestLogger {
networkMock,
haServices,
mock(HeartbeatServices.class, RETURNS_MOCKS),
- mock(MetricRegistryImpl.class),
taskManagerMetricGroup,
mock(BroadcastVariableManager.class),
mock(FileCache.class),
@@ -1371,7 +1369,6 @@ public class TaskExecutorTest extends TestLogger {
mock(NetworkEnvironment.class),
haServicesMock,
heartbeatServicesMock,
- mock(MetricRegistryImpl.class),
mock(TaskManagerMetricGroup.class),
mock(BroadcastVariableManager.class),
mock(FileCache.class),
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 8249fca..4b62770 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -18,14 +18,6 @@
package org.apache.flink.runtime.taskmanager;
-import static org.junit.Assert.*;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.actor.Props;
-import akka.testkit.JavaTestKit;
-
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
@@ -49,21 +41,28 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
-
import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.actor.Props;
+import akka.testkit.JavaTestKit;
import org.junit.Test;
+import java.net.InetAddress;
+import java.util.concurrent.TimeUnit;
+
import scala.Option;
import scala.concurrent.duration.FiniteDuration;
-import java.net.InetAddress;
-import java.util.concurrent.TimeUnit;
+import static org.junit.Assert.assertTrue;
public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
@@ -169,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
network,
numberOfSlots,
highAvailabilityServices,
- new MetricRegistryImpl(metricRegistryConfiguration));
+ new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
taskManager = actorSystem.actorOf(tmProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 2b91cd4..0369771 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, Submitt
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.messages.JobManagerMessages
import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.metrics.MetricRegistryImpl
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 75f0aa4..16e238b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -80,6 +80,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Multimap;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 95ba154..3bdc2ac 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -465,7 +465,11 @@ public class YarnApplicationMasterRunner {
}
if (metricRegistry != null) {
- metricRegistry.shutdown();
+ try {
+ metricRegistry.shutdown();
+ } catch (Throwable t) {
+ LOG.error("Could not properly shut down the metric registry.", t);
+ }
}
org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6feb287..b32d25c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -116,7 +116,7 @@ public class YarnResourceManager extends ResourceManager<ResourceID> implements
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
- MetricRegistryImpl metricRegistry,
+ MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler) {
super(
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
index 439fdf3..e1efb54 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnJobClusterEntrypoint.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -78,7 +78,7 @@ public class YarnJobClusterEntrypoint extends JobClusterEntrypoint {
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- MetricRegistryImpl metricRegistry,
+ MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
index a13f62c..042644b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -68,7 +68,7 @@ public class YarnSessionClusterEntrypoint extends SessionClusterEntrypoint {
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
- MetricRegistryImpl metricRegistry,
+ MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);