You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/24 11:48:42 UTC

[2/2] flink git commit: [FLINK-4555] wait for ResourceManager to cleanly unregister application

[FLINK-4555] wait for ResourceManager to cleanly unregister application

This ensures that the ResourceManager has enough time to unregister the
application before shutting down.

This closes #2514


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e4b7ebd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e4b7ebd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e4b7ebd

Branch: refs/heads/master
Commit: 1e4b7ebd4aa7dba9aa87122fec9db561df198160
Parents: 40c978b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 18:45:37 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Sat Sep 24 13:45:50 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/FlinkResourceManager.java   |  8 +++++---
 .../org/apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++++++--
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e4b7ebd/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 7ea286d..911c1f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -33,16 +33,17 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
 import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
 import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
 import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
 import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
 import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
 import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -253,6 +254,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
 			else if (message instanceof StopCluster) {
 				StopCluster msg = (StopCluster) message;
 				shutdownCluster(msg.finalStatus(), msg.message());
+				sender().tell(decorateMessage(StopClusterSuccessful.getInstance()), ActorRef.noSender());
 			}
 
 			// --- miscellaneous messages

http://git-wip-us.apache.org/repos/asf/flink/blob/1e4b7ebd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index fd96f86..a733943 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1024,8 +1024,15 @@ class JobManager(
       // send resource manager the ok
       currentResourceManager match {
         case Some(rm) =>
-          // inform rm
-          rm ! decorateMessage(msg)
+          try {
+            // inform rm and wait for it to confirm
+            val waitTime = FiniteDuration(5, TimeUnit.SECONDS)
+            val answer = (rm ? decorateMessage(msg))(waitTime)
+            Await.ready(answer, waitTime)
+          } catch {
+            case e: TimeoutException =>
+            case e: InterruptedException =>
+          }
         case None =>
           // ResourceManager not available
           // we choose not to wait here beacuse it might block the shutdown forever