You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/04/01 20:36:45 UTC

flink git commit: [FLINK-3689] fix shutdown of JM when RM is not available

Repository: flink
Updated Branches:
  refs/heads/master 9aa207c07 -> c2f2122c0


[FLINK-3689] fix shutdown of JM when RM is not available

This closes #1847.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2f2122c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2f2122c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2f2122c

Branch: refs/heads/master
Commit: c2f2122c0ba0d0b16eb59aa3ad7958c72fb486b7
Parents: 9aa207c
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Apr 1 14:21:50 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Apr 1 20:24:05 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  10 +-
 .../resourcemanager/ClusterShutdownITCase.java  | 107 +++++++++++++++++--
 2 files changed, 103 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2f2122c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index baffa2a..ba9e1ef 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -987,20 +987,16 @@ class JobManager(
       // send resource manager the ok
       currentResourceManager match {
         case Some(rm) =>
-
           // inform rm
           rm ! decorateMessage(msg)
-
-          sender() ! decorateMessage(StopClusterSuccessful.getInstance())
-
-          // trigger shutdown
-          shutdown()
-
         case None =>
           // ResourceManager not available
           // we choose not to wait here beacuse it might block the shutdown forever
       }
 
+      sender() ! decorateMessage(StopClusterSuccessful.getInstance())
+      shutdown()
+
     case RequestLeaderSessionID =>
       sender() ! ResponseLeaderSessionID(leaderSessionID.orNull)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2f2122c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
index 5ea7d76..f23db09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
@@ -18,34 +18,46 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.SavepointStore;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.instance.InstanceManager;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.junit.runners.MethodSorters;
 import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.ExecutorService;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 
 /**
  * Starts a dedicated Actor system and runs a shutdown test to shut it down.
  */
+// The last test shuts down the actor system
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
 public class ClusterShutdownITCase extends TestLogger {
 
 	private static ActorSystem system;
@@ -63,10 +75,52 @@ public class ClusterShutdownITCase extends TestLogger {
 	}
 
 	/**
-	 * Tests cluster shutdown procedure of RM
+	 * Tests a faked cluster shutdown procedure with and without the ResourceManager.
 	 */
 	@Test
-	public void testClusterShutdown() {
+	public void testClusterShutdownScenarios() {
+
+		new JavaTestKit(system){{
+		new Within(duration("30 seconds")) {
+		@Override
+		protected void run() {
+
+			ActorGateway me =
+				TestingUtils.createForwardingActor(system, getTestActor(), Option.<String>empty());
+
+			// start job manager which doesn't shutdown the actor system
+			ActorGateway jobManager =
+				TestingUtils.createJobManager(system, config, TestJobManager.class, "fakeShutdown");
+
+			// No resource manager connected
+			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+			expectMsgClass(StopClusterSuccessful.class);
+
+			// Start resource manager and let it register
+			ActorGateway resourceManager =
+				TestingUtils.createResourceManager(system, jobManager.actor(), config);
+
+			// notify about a resource manager registration at the job manager
+			resourceManager.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
+
+			// Wait for resource manager
+			expectMsgEquals(Messages.getAcknowledge());
+
+			// Resource Manager connected
+			jobManager.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
+
+			expectMsgClass(StopClusterSuccessful.class);
+
+		}};
+		}};
+	}
+
+	/**
+	 * Tests proper cluster shutdown procedure of RM.
+	 */
+	@Test
+	public void testProperClusterShutdown() {
 
 		new JavaTestKit(system){{
 		new Within(duration("30 seconds")) {
@@ -107,4 +161,43 @@ public class ClusterShutdownITCase extends TestLogger {
 		}};
 	}
 
+	/**
+	 * A job manager which doesn't execute the final shutdown code
+	 */
+	private static class TestJobManager extends JobManager {
+
+		public TestJobManager(Configuration flinkConfiguration,
+			 ExecutorService executorService,
+			 InstanceManager instanceManager,
+			 Scheduler scheduler,
+			 BlobLibraryCacheManager libraryCacheManager,
+			 ActorRef archive,
+			 RestartStrategy defaultRestartStrategy,
+			 FiniteDuration timeout,
+			 LeaderElectionService leaderElectionService,
+			 SubmittedJobGraphStore submittedJobGraphs,
+			 CheckpointRecoveryFactory checkpointRecoveryFactory,
+			 SavepointStore savepointStore,
+			 FiniteDuration jobRecoveryTimeout) {
+			super(flinkConfiguration,
+			 executorService,
+			 instanceManager,
+			 scheduler,
+			 libraryCacheManager,
+			 archive,
+			 defaultRestartStrategy,
+			 timeout,
+			 leaderElectionService,
+			 submittedJobGraphs,
+			 checkpointRecoveryFactory,
+			 savepointStore,
+			 jobRecoveryTimeout);
+		}
+
+		@Override
+		public void shutdown() {
+			// do not shutdown
+		}
+	}
+
 }