You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/04/01 20:36:45 UTC
flink git commit: [FLINK-3689] fix shutdown of JM when RM is not
available
Repository: flink
Updated Branches:
refs/heads/master 9aa207c07 -> c2f2122c0
[FLINK-3689] fix shutdown of JM when RM is not available
This closes #1847.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2f2122c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2f2122c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2f2122c
Branch: refs/heads/master
Commit: c2f2122c0ba0d0b16eb59aa3ad7958c72fb486b7
Parents: 9aa207c
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Apr 1 14:21:50 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Apr 1 20:24:05 2016 +0200
----------------------------------------------------------------------
.../flink/runtime/jobmanager/JobManager.scala | 10 +-
.../resourcemanager/ClusterShutdownITCase.java | 107 +++++++++++++++++--
2 files changed, 103 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c2f2122c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index baffa2a..ba9e1ef 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -987,20 +987,16 @@ class JobManager(
// send resource manager the ok
currentResourceManager match {
case Some(rm) =>
-
// inform rm
rm ! decorateMessage(msg)
-
- sender() ! decorateMessage(StopClusterSuccessful.getInstance())
-
- // trigger shutdown
- shutdown()
-
case None =>
// ResourceManager not available
// we choose not to wait here beacuse it might block the shutdown forever
}
+ sender() ! decorateMessage(StopClusterSuccessful.getInstance())
+ shutdown()
+
case RequestLeaderSessionID =>
sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
http://git-wip-us.apache.org/repos/asf/flink/blob/c2f2122c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 5ea7d76..f23db09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -18,34 +18,46 @@
package org.apache.flink.runtime.resourcemanager;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
import org.junit.Test;
-import org.mockito.Mockito;
+import org.junit.runners.MethodSorters;
import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ExecutorService;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Starts a dedicated Actor system and runs a shutdown test to shut it down.
*/
+// The last test shuts down the actor system
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ClusterShutdownITCase extends TestLogger {
private static ActorSystem system;
@@ -63,10 +75,52 @@ public class ClusterShutdownITCase extends TestLogger {
}
/**
- * Tests cluster shutdown procedure of RM
+ * Tests a faked cluster shutdown procedure with and without the ResourceManager.
*/
@Test
- public void testClusterShutdown() {
+ public void testClusterShutdownScenarios() {
+
+ new JavaTestKit(system){{
+ new Within(duration("30 seconds")) {
+ @Override
+ protected void run() {
+
+ ActorGateway me =
+ TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+ // start job manager which doesn't shutdown the actor system
+ ActorGateway jobManager =
+ TestingUtils.createJobManager(system, config, TestJobManager.class, "fakeShutdown");
+
+ // No resource manager connected
+ jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+ expectMsgClass(StopClusterSuccessful.class);
+
+ // Start resource manager and let it register
+ ActorGateway resourceManager =
+ TestingUtils.createResourceManager(system, jobManager.actor(), config);
+
+ // notify about a resource manager registration at the job manager
+ resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+ // Wait for resource manager
+ expectMsgEquals(Messages.getAcknowledge());
+
+ // Resource Manager connected
+ jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+ expectMsgClass(StopClusterSuccessful.class);
+
+ }};
+ }};
+ }
+
+ /**
+ * Tests proper cluster shutdown procedure of RM.
+ */
+ @Test
+ public void testProperClusterShutdown() {
new JavaTestKit(system){{
new Within(duration("30 seconds")) {
@@ -107,4 +161,43 @@ public class ClusterShutdownITCase extends TestLogger {
}};
}
+ /**
+ * A job manager which doesn't execute the final shutdown code
+ */
+ private static class TestJobManager extends JobManager {
+
+ public TestJobManager(Configuration flinkConfiguration,
+ ExecutorService executorService,
+ InstanceManager instanceManager,
+ Scheduler scheduler,
+ BlobLibraryCacheManager libraryCacheManager,
+ ActorRef archive,
+ RestartStrategy defaultRestartStrategy,
+ FiniteDuration timeout,
+ LeaderElectionService leaderElectionService,
+ SubmittedJobGraphStore submittedJobGraphs,
+ CheckpointRecoveryFactory checkpointRecoveryFactory,
+ SavepointStore savepointStore,
+ FiniteDuration jobRecoveryTimeout) {
+ super(flinkConfiguration,
+ executorService,
+ instanceManager,
+ scheduler,
+ libraryCacheManager,
+ archive,
+ defaultRestartStrategy,
+ timeout,
+ leaderElectionService,
+ submittedJobGraphs,
+ checkpointRecoveryFactory,
+ savepointStore,
+ jobRecoveryTimeout);
+ }
+
+ @Override
+ public void shutdown() {
+ // do not shutdown
+ }
+ }
+
}