You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:27:59 UTC
[7/8] flink git commit: [FLINK-6708] [yarn] Minor improvements to
YARN session HA fixes
[FLINK-6708] [yarn] Minor improvements to YARN session HA fixes
This closes #3981.
This closes #3982.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd914629
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd914629
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd914629
Branch: refs/heads/master
Commit: fd9146295b7af6ce1a2976a52a71add1c5a37f99
Parents: 2a77867
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Fri May 26 15:00:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri May 26 16:27:18 2017 +0800
----------------------------------------------------------------------
.../apache/flink/yarn/AbstractYarnClusterDescriptor.java | 6 +++++-
.../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java | 2 +-
.../main/scala/org/apache/flink/yarn/YarnJobManager.scala | 10 +++++++---
3 files changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd914629/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 818a3e8..044d1e7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1235,7 +1235,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
LOG.info("Deleting files in {}.", yarnFilesDir);
try {
FileSystem fs = FileSystem.get(conf);
- fs.delete(yarnFilesDir, true);
+
+ if (!fs.delete(yarnFilesDir, true)) {
+ throw new IOException("Deleting files in " + yarnFilesDir + " was unsuccessful");
+ }
+
fs.close();
} catch (IOException e) {
LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
http://git-wip-us.apache.org/repos/asf/flink/blob/fd914629/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d2a4340..53253d6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -424,7 +424,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
numTaskmanagers = status.numRegisteredTaskManagers();
}
} catch (Exception e) {
- LOG.warn("Could not retrieve the current cluster status. Retrying...", e);
+ LOG.warn("Could not retrieve the current cluster status. Skipping current retrieval attempt ...", e);
}
List<String> messages = yarnCluster.getNewMessages();
http://git-wip-us.apache.org/repos/asf/flink/blob/fd914629/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 902553f..e094bb7 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -100,8 +100,8 @@ class YarnJobManager(
handleYarnShutdown orElse super.handleMessage
}
- def handleYarnShutdown: Receive = {
- case msg:StopCluster =>
+ private def handleYarnShutdown: Receive = {
+ case msg: StopCluster =>
super.handleMessage(msg)
// do global cleanup if the yarn files path has been set
@@ -113,7 +113,11 @@ class YarnJobManager(
try {
val fs = path.getFileSystem
- fs.delete(path, true)
+
+ if (!fs.delete(path, true)) {
+ throw new IOException(s"Deleting yarn application files under $filePath " +
+ s"was unsuccessful.")
+ }
} catch {
case ioe: IOException =>
log.warn(