You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:27:59 UTC

[7/8] flink git commit: [FLINK-6708] [yarn] Minor improvements to YARN session HA fixes

[FLINK-6708] [yarn] Minor improvements to YARN session HA fixes

This closes #3981.
This closes #3982.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd914629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd914629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd914629

Branch: refs/heads/master
Commit: fd9146295b7af6ce1a2976a52a71add1c5a37f99
Parents: 2a77867
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 26 15:00:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:18 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/yarn/AbstractYarnClusterDescriptor.java  |  6 +++++-
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  2 +-
 .../main/scala/org/apache/flink/yarn/YarnJobManager.scala | 10 +++++++---
 3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd914629/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 818a3e8..044d1e7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1235,7 +1235,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			LOG.info("Deleting files in {}.", yarnFilesDir);
 			try {
 				FileSystem fs = FileSystem.get(conf);
-				fs.delete(yarnFilesDir, true);
+
+				if (!fs.delete(yarnFilesDir, true)) {
+					throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
+				}
+
 				fs.close();
 			} catch (IOException e) {
 				LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd914629/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d2a4340..53253d6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -424,7 +424,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 						numTaskmanagers = status.numRegisteredTaskManagers();
 					}
 				} catch (Exception e) {
-					LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
+					LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd914629/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 902553f..e094bb7 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -100,8 +100,8 @@ class YarnJobManager(
     handleYarnShutdown orElse super.handleMessage
   }
 
-  def handleYarnShutdown: Receive = {
-    case msg:StopCluster =>
+  private def handleYarnShutdown: Receive = {
+    case msg: StopCluster =>
       super.handleMessage(msg)
 
       // do global cleanup if the yarn files path has been set
@@ -113,7 +113,11 @@ class YarnJobManager(
 
           try {
             val fs = path.getFileSystem
-            fs.delete(path, true)
+
+            if (!fs.delete(path, true)) {
+              throw new IOException(s"Deleting yarn application files under $filePath " +
+                s"was unsuccessful.")
+            }
           } catch {
             case ioe: IOException =>
               log.warn(