You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 08:01:38 UTC
[3/5] flink git commit: [FLINK-5488] Close YarnClient on error in
AbstractYarnClusterDescriptor
[FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17c5de5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17c5de5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17c5de5a
Branch: refs/heads/release-1.3
Commit: 17c5de5a35b4fa72e3636539dfec2cd9135ef91d
Parents: db7f0ff
Author: zjureel <zj...@gmail.com>
Authored: Wed May 31 13:13:34 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200
----------------------------------------------------------------------
.../flink/yarn/AbstractYarnClusterDescriptor.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/17c5de5a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b9a4416..bfb9625 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -385,6 +385,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
@Override
public YarnClusterClient retrieve(String applicationID) {
+ YarnClient yarnClient = null;
try {
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -395,7 +396,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
- final YarnClient yarnClient = getYarnClient();
+ yarnClient = getYarnClient();
final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -413,6 +414,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
} catch (Exception e) {
+ if (null != yarnClient) {
+ yarnClient.stop();
+ }
throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
}
}
@@ -533,7 +537,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
- ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ ClusterResourceDescription freeClusterMem;
+ try {
+ freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ } catch (YarnException | IOException e) {
+ failSessionDuringDeployment(yarnClient, yarnApplication);
+ throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
+ }
+
if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);