You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/14 23:07:58 UTC

git commit: [YARN] Improved the check for resource availability within the cluster

Repository: incubator-flink
Updated Branches:
  refs/heads/master 08f189ad3 -> 708426ba6


[YARN] Improved the check for resource availability within the cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/708426ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/708426ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/708426ba

Branch: refs/heads/master
Commit: 708426ba6783bc600e1b818c78d90d567a3734a4
Parents: 08f189a
Author: Robert Metzger <rm...@apache.org>
Authored: Sat Jun 14 12:14:05 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Sat Jun 14 18:12:33 2014 +0200

----------------------------------------------------------------------
 .../main/java/eu/stratosphere/yarn/Client.java  | 43 +++++++++++++++++++-
 1 file changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/708426ba/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
index 4ef1456..3495dbb 100644
--- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
+++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -338,6 +339,26 @@ public class Client {
 			yarnClient.stop();
 			System.exit(1);
 		}
+		int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
+		ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+		if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+			LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
+					+ "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
+			yarnClient.stop();
+			System.exit(1);
+		}
+		if( tmMemory > freeClusterMem.containerLimit) {
+			LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
+					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
+			yarnClient.stop();
+			System.exit(1);
+		}
+		if( jmMemory > freeClusterMem.containerLimit) {
+			LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
+					+ "the largest possible YARN container: "+freeClusterMem.containerLimit);
+			yarnClient.stop();
+			System.exit(1);
+		}
 		
 		// respect custom JVM options in the YAML file
 		final String javaOpts = GlobalConfiguration.getString(ConfigConstants.STRATOSPHERE_JVM_OPTIONS, "");
@@ -495,6 +516,24 @@ public class Client {
 		}
 		
 	}
+	private static class ClusterResourceDescription {
+		public int totalFreeMemory;
+		public int containerLimit;
+	}
+	private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+		ClusterResourceDescription crd = new ClusterResourceDescription();
+		crd.totalFreeMemory = 0;
+		crd.containerLimit = 0;
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+		for(NodeReport rep : nodes) {
+			int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+			crd.totalFreeMemory += free;
+			if(free > crd.containerLimit) {
+				crd.containerLimit = free;
+			}
+		}
+		return crd;
+	}
 
 	private void printUsage() {
 		System.out.println("Usage:");
@@ -523,8 +562,8 @@ public class Client {
 	private void showClusterMetrics(YarnClient yarnClient)
 			throws YarnException, IOException {
 		YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
-		System.out.println("NodeManagers in Cluster " + metrics.getNumNodeManagers());
-		List<NodeReport> nodes = yarnClient.getNodeReports();
+		System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
+		List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
 		final String format = "|%-16s |%-16s %n";
 		System.out.printf("|Property         |Value          %n");
 		System.out.println("+---------------------------------------+");