You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/14 23:07:58 UTC
git commit: [YARN] Improved the check for resource availability
within the cluster
Repository: incubator-flink
Updated Branches:
refs/heads/master 08f189ad3 -> 708426ba6
[YARN] Improved the check for resource availability within the cluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/708426ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/708426ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/708426ba
Branch: refs/heads/master
Commit: 708426ba6783bc600e1b818c78d90d567a3734a4
Parents: 08f189a
Author: Robert Metzger <rm...@apache.org>
Authored: Sat Jun 14 12:14:05 2014 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Sat Jun 14 18:12:33 2014 +0200
----------------------------------------------------------------------
.../main/java/eu/stratosphere/yarn/Client.java | 43 +++++++++++++++++++-
1 file changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/708426ba/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
index 4ef1456..3495dbb 100644
--- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
+++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -338,6 +339,26 @@ public class Client {
yarnClient.stop();
System.exit(1);
}
+ int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount;
+ ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient);
+ if(freeClusterMem.totalFreeMemory < totalMemoryRequired) {
+ LOG.fatal("This YARN session requires "+totalMemoryRequired+"MB of memory in the cluster. "
+ + "There are currently only "+freeClusterMem.totalFreeMemory+"MB available.");
+ yarnClient.stop();
+ System.exit(1);
+ }
+ if( tmMemory > freeClusterMem.containerLimit) {
+ LOG.fatal("The requested amount of memory for the TaskManagers ("+tmMemory+"MB) is more than "
+ + "the largest possible YARN container: "+freeClusterMem.containerLimit);
+ yarnClient.stop();
+ System.exit(1);
+ }
+ if( jmMemory > freeClusterMem.containerLimit) {
+ LOG.fatal("The requested amount of memory for the JobManager ("+jmMemory+"MB) is more than "
+ + "the largest possible YARN container: "+freeClusterMem.containerLimit);
+ yarnClient.stop();
+ System.exit(1);
+ }
// respect custom JVM options in the YAML file
final String javaOpts = GlobalConfiguration.getString(ConfigConstants.STRATOSPHERE_JVM_OPTIONS, "");
@@ -495,6 +516,24 @@ public class Client {
}
}
+ private static class ClusterResourceDescription {
+ public int totalFreeMemory;
+ public int containerLimit;
+ }
+ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
+ ClusterResourceDescription crd = new ClusterResourceDescription();
+ crd.totalFreeMemory = 0;
+ crd.containerLimit = 0;
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
+ for(NodeReport rep : nodes) {
+ int free = rep.getCapability().getMemory() - (rep.getUsed() != null ? rep.getUsed().getMemory() : 0 );
+ crd.totalFreeMemory += free;
+ if(free > crd.containerLimit) {
+ crd.containerLimit = free;
+ }
+ }
+ return crd;
+ }
private void printUsage() {
System.out.println("Usage:");
@@ -523,8 +562,8 @@ public class Client {
private void showClusterMetrics(YarnClient yarnClient)
throws YarnException, IOException {
YarnClusterMetrics metrics = yarnClient.getYarnClusterMetrics();
- System.out.println("NodeManagers in Cluster " + metrics.getNumNodeManagers());
- List<NodeReport> nodes = yarnClient.getNodeReports();
+ System.out.println("NodeManagers in the Cluster " + metrics.getNumNodeManagers());
+ List<NodeReport> nodes = yarnClient.getNodeReports(NodeState.RUNNING);
final String format = "|%-16s |%-16s %n";
System.out.printf("|Property |Value %n");
System.out.println("+---------------------------------------+");