You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/01 08:01:38 UTC

[3/5] flink git commit: [FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor

[FLINK-5488] Close YarnClient on error in AbstractYarnClusterDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17c5de5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17c5de5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17c5de5a

Branch: refs/heads/release-1.3
Commit: 17c5de5a35b4fa72e3636539dfec2cd9135ef91d
Parents: db7f0ff
Author: zjureel <zj...@gmail.com>
Authored: Wed May 31 13:13:34 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Fri Jun 30 08:51:49 2017 +0200

----------------------------------------------------------------------
 .../flink/yarn/AbstractYarnClusterDescriptor.java    | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17c5de5a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b9a4416..bfb9625 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -385,6 +385,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 
+		YarnClient yarnClient = null;
 		try {
 			// check if required Hadoop environment variables are set. If not, warn user
 			if (System.getenv("HADOOP_CONF_DIR") == null &&
@@ -395,7 +396,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			}
 
 			final ApplicationId yarnAppId = ConverterUtils.toApplicationId(applicationID);
-			final YarnClient yarnClient = getYarnClient();
+			yarnClient = getYarnClient();
 			final ApplicationReport appReport = yarnClient.getApplicationReport(yarnAppId);
 
 			if (appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
@@ -413,6 +414,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 			return createYarnClusterClient(this, yarnClient, appReport, flinkConfiguration, false);
 		} catch (Exception e) {
+			if (null != yarnClient) {
+				yarnClient.stop();
+			}
 			throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
 		}
 	}
@@ -533,7 +537,14 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
 			"the resources become available.";
 		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
-		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		ClusterResourceDescription freeClusterMem;
+		try {
+			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		} catch (YarnException | IOException e) {
+			failSessionDuringDeployment(yarnClient, yarnApplication);
+			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
+		}
+
 		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
 			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
 				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available." + NOTE_RSC);