You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2016/09/24 11:48:42 UTC
[2/2] flink git commit: [FLINK-4555] wait for ResourceManager to
cleanly unregister application
[FLINK-4555] wait for ResourceManager to cleanly unregister application
This ensures that the ResourceManager has enough time to unregister the
application before shutting down.
This closes #2514
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e4b7ebd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e4b7ebd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e4b7ebd
Branch: refs/heads/master
Commit: 1e4b7ebd4aa7dba9aa87122fec9db561df198160
Parents: 40c978b
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Sep 21 18:45:37 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Sat Sep 24 13:45:50 2016 +0200
----------------------------------------------------------------------
.../runtime/clusterframework/FlinkResourceManager.java | 8 +++++---
.../org/apache/flink/runtime/jobmanager/JobManager.scala | 11 +++++++++--
2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1e4b7ebd/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index 7ea286d..911c1f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -33,16 +33,17 @@ import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -253,6 +254,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrieva
else if (message instanceof StopCluster) {
StopCluster msg = (StopCluster) message;
shutdownCluster(msg.finalStatus(), msg.message());
+ sender().tell(decorateMessage(StopClusterSuccessful.getInstance()), ActorRef.noSender());
}
// --- miscellaneous messages
http://git-wip-us.apache.org/repos/asf/flink/blob/1e4b7ebd/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index fd96f86..a733943 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1024,8 +1024,15 @@ class JobManager(
// send resource manager the ok
currentResourceManager match {
case Some(rm) =>
- // inform rm
- rm ! decorateMessage(msg)
+ try {
+ // inform rm and wait for it to confirm
+ val waitTime = FiniteDuration(5, TimeUnit.SECONDS)
+ val answer = (rm ? decorateMessage(msg))(waitTime)
+ Await.ready(answer, waitTime)
+ } catch {
+ case e: TimeoutException =>
+ case e: InterruptedException =>
+ }
case None =>
// ResourceManager not available
// we choose not to wait here beacuse it might block the shutdown forever