You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/01/08 06:29:37 UTC

hadoop git commit: YARN-7242. Support to specify values of different resource types in DistributedShell for easier testing. Contributed by Gergely Novák.

Repository: hadoop
Updated Branches:
  refs/heads/trunk c2d6fa365 -> 01f3f2167


YARN-7242. Support to specify values of different resource types in DistributedShell for easier testing. Contributed by Gergely Novák.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/01f3f216
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/01f3f216
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/01f3f216

Branch: refs/heads/trunk
Commit: 01f3f2167ec20b52a18bc2cf250fb4229cfd2c14
Parents: c2d6fa3
Author: Sunil G <su...@apache.org>
Authored: Mon Jan 8 11:59:06 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon Jan 8 11:59:06 2018 +0530

----------------------------------------------------------------------
 .../distributedshell/ApplicationMaster.java     |  20 ++
 .../applications/distributedshell/Client.java   | 182 +++++++++++++++----
 .../distributedshell/TestDistributedShell.java  | 175 +++++++++++++++++-
 3 files changed, 338 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/01f3f216/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index b3fa0ff..85496d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.LogManager;
 
@@ -244,6 +245,9 @@ public class ApplicationMaster {
   // VirtualCores to request for the container on which the shell command will run
   private static final int DEFAULT_CONTAINER_VCORES = 1;
   private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
+  // All other resources to request for the container
+  // on which the shell command will run
+  private Map<String, Long> containerResources = new HashMap<>();
   // Priority of the request
   private int requestPriority;
   // Execution type of the containers.
@@ -431,6 +435,10 @@ public class ApplicationMaster {
         "Amount of memory in MB to be requested to run the shell command");
     opts.addOption("container_vcores", true,
         "Amount of virtual cores to be requested to run the shell command");
+    opts.addOption("container_resources", true,
+        "Amount of resources to be requested to run the shell command. " +
+        "Specified as resource type=value pairs separated by commas. " +
+        "E.g. -container_resources memory-mb=512,vcores=1");
     opts.addOption("container_resource_profile", true,
         "Resource profile to be requested to run the shell command");
     opts.addOption("num_containers", true,
@@ -590,6 +598,14 @@ public class ApplicationMaster {
         "container_memory", "-1"));
     containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
         "container_vcores", "-1"));
+    containerResources = new HashMap<>();
+    if (cliParser.hasOption("container_resources")) {
+      Map<String, Long> resources = Client.parseResourcesString(
+          cliParser.getOptionValue("container_resources"));
+      for (Map.Entry<String, Long> entry : resources.entrySet()) {
+        containerResources.put(entry.getKey(), entry.getValue());
+      }
+    }
     containerResourceProfile =
         cliParser.getOptionValue("container_resource_profile", "");
     numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
@@ -711,6 +727,7 @@ public class ApplicationMaster {
         .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
             appMasterTrackingUrl);
     resourceProfiles = response.getResourceProfiles();
+    ResourceUtils.reinitializeResources(response.getResourceTypes());
     // Dump out information about cluster capability as seen by the
     // resource manager
     long maxMem = response.getMaximumResourceCapability().getMemorySize();
@@ -1593,6 +1610,9 @@ public class ApplicationMaster {
               containerVirtualCores;
       resourceCapability.setMemorySize(containerMemory);
       resourceCapability.setVirtualCores(containerVirtualCores);
+      for (Map.Entry<String, Long> entry : containerResources.entrySet()) {
+        resourceCapability.setResourceValue(entry.getKey(), entry.getValue());
+      }
     }
 
     String profileName = containerResourceProfile;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/01f3f216/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index e299acc..ef635d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.Vector;
 import java.util.Arrays;
 
+import com.google.common.base.Joiner;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -70,7 +71,9 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -81,8 +84,11 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.slf4j.Logger;
@@ -144,7 +150,8 @@ public class Client {
   private long amMemory = DEFAULT_AM_MEMORY;
   // Amt. of virtual core resource to request for to run the App Master
   private int amVCores = DEFAULT_AM_VCORES;
-
+  // Amount of resources to request to run the App Master
+  private Map<String, Long> amResources = new HashMap<>();
   // AM resource profile
   private String amResourceProfile = "";
 
@@ -168,6 +175,9 @@ public class Client {
   private long containerMemory = DEFAULT_CONTAINER_MEMORY;
   // Amt. of virtual cores to request for container in which shell script will be executed
   private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
+  // Amt. of resources to request for container
+  // in which shell script will be executed
+  private Map<String, Long> containerResources = new HashMap<>();
   // container resource profile
   private String containerResourceProfile = "";
   // No. of containers in which the shell script needs to be executed
@@ -265,6 +275,8 @@ public class Client {
 
   Client(String appMasterMainClass, Configuration conf) {
     this.conf = conf;
+    this.conf.setBoolean(
+        YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true);
     this.appMasterMainClass = appMasterMainClass;
     yarnClient = YarnClient.createYarnClient();
     yarnClient.init(conf);
@@ -274,7 +286,12 @@ public class Client {
     opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
     opts.addOption("timeout", true, "Application timeout in milliseconds");
     opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
-    opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
+    opts.addOption("master_vcores", true, "Amount of virtual cores " +
+        "to be requested to run the application master");
+    opts.addOption("master_resources", true, "Amount of resources " +
+        "to be requested to run the application master. " +
+        "Specified as resource type=value pairs separated by commas." +
+        "E.g. -master_resources memory-mb=512,vcores=2");
     opts.addOption("jar", true, "Jar file containing the application master");
     opts.addOption("master_resource_profile", true, "Resource profile for the application master");
     opts.addOption("shell_command", true, "Shell command to be executed by " +
@@ -290,8 +307,14 @@ public class Client {
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
     opts.addOption("container_type", true,
         "Container execution type, GUARANTEED or OPPORTUNISTIC");
-    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
-    opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
+    opts.addOption("container_memory", true, "Amount of memory in MB " +
+        "to be requested to run the shell command");
+    opts.addOption("container_vcores", true, "Amount of virtual cores " +
+        "to be requested to run the shell command");
+    opts.addOption("container_resources", true, "Amount of resources " +
+        "to be requested to run the shell command. " +
+        "Specified as resource type=value pairs separated by commas. " +
+        "E.g. -container_resources memory-mb=256,vcores=1");
     opts.addOption("container_resource_profile", true, "Resource profile for the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("promote_opportunistic_after_start", false,
@@ -403,6 +426,19 @@ public class Client {
         Integer.parseInt(cliParser.getOptionValue("master_memory", "-1"));
     amVCores =
         Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1"));
+    if (cliParser.hasOption("master_resources")) {
+      Map<String, Long> masterResources =
+          parseResourcesString(cliParser.getOptionValue("master_resources"));
+      for (Map.Entry<String, Long> entry : masterResources.entrySet()) {
+        if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) {
+          amMemory = entry.getValue();
+        } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) {
+          amVCores = entry.getValue().intValue();
+        } else {
+          amResources.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
     amResourceProfile = cliParser.getOptionValue("master_resource_profile", "");
 
     if (!cliParser.hasOption("jar")) {
@@ -461,6 +497,19 @@ public class Client {
         Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
     containerVirtualCores =
         Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1"));
+    if (cliParser.hasOption("container_resources")) {
+      Map<String, Long> resources =
+          parseResourcesString(cliParser.getOptionValue("container_resources"));
+      for (Map.Entry<String, Long> entry : resources.entrySet()) {
+        if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) {
+          containerMemory = entry.getValue();
+        } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) {
+          containerVirtualCores = entry.getValue().intValue();
+        } else {
+          containerResources.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
     containerResourceProfile =
         cliParser.getOptionValue("container_resource_profile", "");
     numContainers =
@@ -637,9 +686,9 @@ public class Client {
     // Set up resource type requirements
     // For now, both memory and vcores are supported, so we set memory and
     // vcores requirements
-    setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile,
-        amPriority, profiles);
-    setContainerResources(containerMemory, containerVirtualCores, profiles);
+    List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo();
+    setAMResourceCapability(appContext, profiles, resourceTypes);
+    setContainerResources(profiles, resourceTypes);
 
     appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
     appContext.setApplicationName(appName);
@@ -776,6 +825,10 @@ public class Client {
     if (containerVirtualCores > 0) {
       vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
     }
+    if (!containerResources.isEmpty()) {
+      Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("=");
+      vargs.add("--container_resources " + joiner.join(containerResources));
+    }
     if (containerResourceProfile != null && !containerResourceProfile
         .isEmpty()) {
       vargs.add("--container_resource_profile " + containerResourceProfile);
@@ -1011,25 +1064,26 @@ public class Client {
   }
 
   private void setAMResourceCapability(ApplicationSubmissionContext appContext,
-      long memory, int vcores, String profile, int priority,
-      Map<String, Resource> profiles) throws IllegalArgumentException {
-    if (memory < -1 || memory == 0) {
+      Map<String, Resource> profiles, List<ResourceTypeInfo> resourceTypes)
+      throws IllegalArgumentException, IOException, YarnException {
+    if (amMemory < -1 || amMemory == 0) {
       throw new IllegalArgumentException("Invalid memory specified for"
-          + " application master, exiting. Specified memory=" + memory);
+          + " application master, exiting. Specified memory=" + amMemory);
     }
-    if (vcores < -1 || vcores == 0) {
+    if (amVCores < -1 || amVCores == 0) {
       throw new IllegalArgumentException("Invalid virtual cores specified for"
-          + " application master, exiting. Specified virtual cores=" + vcores);
+          + " application master, exiting. " +
+          "Specified virtual cores=" + amVCores);
     }
-    String tmp = profile;
-    if (profile.isEmpty()) {
+    String tmp = amResourceProfile;
+    if (amResourceProfile.isEmpty()) {
       tmp = "default";
     }
     if (appContext.getAMContainerResourceRequests() == null) {
       List<ResourceRequest> amResourceRequests = new ArrayList<ResourceRequest>();
       amResourceRequests
-          .add(ResourceRequest.newInstance(Priority.newInstance(priority), "*",
-              Resources.clone(Resources.none()), 1));
+          .add(ResourceRequest.newInstance(Priority.newInstance(amPriority),
+              "*", Resources.clone(Resources.none()), 1));
       appContext.setAMContainerResourceRequests(amResourceRequests);
     }
 
@@ -1038,36 +1092,90 @@ public class Client {
       appContext.getAMContainerResourceRequests().get(0).setProfileCapability(
           ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
     }
+
     Resource capability = Resource.newInstance(0, 0);
+
+    validateResourceTypes(amResources.keySet(), resourceTypes);
+    for (Map.Entry<String, Long> entry : amResources.entrySet()) {
+      capability.setResourceValue(entry.getKey(), entry.getValue());
+    }
     // set amMemory because it's used to set Xmx param
-    if (profiles == null) {
-      amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory;
-      amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores;
-      capability.setMemorySize(amMemory);
-      capability.setVirtualCores(amVCores);
-    } else {
-      amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory;
-      amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores;
-      capability.setMemorySize(memory);
-      capability.setVirtualCores(vcores);
+    if (amMemory == -1) {
+      amMemory = (profiles == null) ? DEFAULT_AM_MEMORY :
+          profiles.get(tmp).getMemorySize();
+    }
+    if (amVCores == -1) {
+      amVCores = (profiles == null) ? DEFAULT_AM_VCORES :
+          profiles.get(tmp).getVirtualCores();
     }
+    capability.setMemorySize(amMemory);
+    capability.setVirtualCores(amVCores);
     appContext.getAMContainerResourceRequests().get(0).getProfileCapability()
         .setProfileCapabilityOverride(capability);
   }
 
-  private void setContainerResources(long memory, int vcores,
-      Map<String, Resource> profiles) throws IllegalArgumentException {
-    if (memory < -1 || memory == 0) {
-      throw new IllegalArgumentException(
-          "Container memory '" + memory + "' has to be greated than 0");
+  private void setContainerResources(Map<String, Resource> profiles,
+      List<ResourceTypeInfo> resourceTypes) throws IllegalArgumentException {
+    if (containerMemory < -1 || containerMemory == 0) {
+      throw new IllegalArgumentException("Container memory '" +
+          containerMemory + "' has to be greated than 0");
     }
-    if (vcores < -1 || vcores == 0) {
-      throw new IllegalArgumentException(
-          "Container vcores '" + vcores + "' has to be greated than 0");
+    if (containerVirtualCores < -1 || containerVirtualCores == 0) {
+      throw new IllegalArgumentException("Container vcores '" +
+          containerVirtualCores + "' has to be greated than 0");
     }
+    validateResourceTypes(containerResources.keySet(), resourceTypes);
     if (profiles == null) {
-      containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory;
-      containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores;
+      containerMemory = containerMemory == -1 ?
+          DEFAULT_CONTAINER_MEMORY : containerMemory;
+      containerVirtualCores = containerVirtualCores == -1 ?
+          DEFAULT_CONTAINER_VCORES : containerVirtualCores;
+    }
+  }
+
+  private void validateResourceTypes(Iterable<String> resourceNames,
+      List<ResourceTypeInfo> resourceTypes) {
+    for (String resourceName : resourceNames) {
+      if (!resourceTypes.stream().anyMatch(e ->
+          e.getName().equals(resourceName))) {
+        throw new ResourceNotFoundException("Unknown resource: " +
+            resourceName);
+      }
+    }
+  }
+
+  static Map<String, Long> parseResourcesString(String resourcesStr) {
+    Map<String, Long> resources = new HashMap<>();
+
+    // Ignore the grouping "[]"
+    if (resourcesStr.startsWith("[")) {
+      resourcesStr = resourcesStr.substring(1);
+    }
+    if (resourcesStr.endsWith("]")) {
+      resourcesStr = resourcesStr.substring(0, resourcesStr.length());
+    }
+
+    for (String resource : resourcesStr.trim().split(",")) {
+      resource = resource.trim();
+      if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) {
+        throw new IllegalArgumentException("\"" + resource + "\" is not a " +
+            "valid resource type/amount pair. " +
+            "Please provide key=amount pairs separated by commas.");
+      }
+      String[] splits = resource.split("=");
+      String key = splits[0], value = splits[1];
+      String units = ResourceUtils.getUnits(value);
+      String valueWithoutUnit = value.substring(
+          0, value.length() - units.length()).trim();
+      Long resourceValue = Long.valueOf(valueWithoutUnit);
+      if (!units.isEmpty()) {
+        resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
+      }
+      if (key.equals("memory")) {
+        key = ResourceInformation.MEMORY_URI;
+      }
+      resources.put(key, resourceValue);
     }
+    return resources;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/01f3f216/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index d6bb8d6..667b60d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.cli.MissingArgumentException;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -54,11 +55,15 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -71,11 +76,11 @@ import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
 import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
@@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -112,6 +118,7 @@ public class TestDistributedShell {
   private static final int NUM_NMS = 1;
   private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
   private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  private static final int MIN_ALLOCATION_MB = 128;
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
@@ -138,7 +145,8 @@ public class TestDistributedShell {
     LOG.info("Starting up YARN cluster");
 
     conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        MIN_ALLOCATION_MB);
     // reduce the teardown waiting time
     conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
     conf.set("yarn.log.dir", "target");
@@ -1433,4 +1441,167 @@ public class TestDistributedShell {
       Assert.fail("Job execution with opportunistic containers failed.");
     }
   }
+
+  @Test
+  @TimelineVersion(2.0f)
+  public void testDistributedShellWithResources() throws Exception {
+    doTestDistributedShellWithResources(false);
+  }
+
+  @Test
+  @TimelineVersion(2.0f)
+  public void testDistributedShellWithResourcesWithLargeContainers()
+      throws Exception {
+    doTestDistributedShellWithResources(true);
+  }
+
+  public void doTestDistributedShellWithResources(boolean largeContainers)
+      throws Exception {
+    Resource clusterResource = yarnCluster.getResourceManager()
+        .getResourceScheduler().getClusterResource();
+    String masterMemoryString = "1 Gi";
+    String containerMemoryString = "512 Mi";
+    long masterMemory = 1024;
+    long containerMemory = 512;
+    Assume.assumeTrue("The cluster doesn't have enough memory for this test",
+        clusterResource.getMemorySize() >= masterMemory + containerMemory);
+    Assume.assumeTrue("The cluster doesn't have enough cores for this test",
+        clusterResource.getVirtualCores() >= 2);
+    if (largeContainers) {
+      masterMemory = clusterResource.getMemorySize() * 2 / 3;
+      masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB;
+      masterMemoryString = masterMemory + "Mi";
+      containerMemory = clusterResource.getMemorySize() / 3;
+      containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB;
+      containerMemoryString = String.valueOf(containerMemory);
+    }
+
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "2",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_resources",
+        "memory=" + masterMemoryString + ",vcores=1",
+        "--container_resources",
+        "memory=" + containerMemoryString + ",vcores=1",
+    };
+
+    LOG.info("Initializing DS Client");
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    Assert.assertTrue(client.init(args));
+    LOG.info("Running DS Client");
+    final AtomicBoolean result = new AtomicBoolean(false);
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          result.set(client.run());
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    t.start();
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(new Configuration(yarnCluster.getConfig()));
+    yarnClient.start();
+
+    while (true) {
+      List<ApplicationReport> apps = yarnClient.getApplications();
+      if (apps.isEmpty()) {
+        Thread.sleep(10);
+        continue;
+      }
+      ApplicationReport appReport = apps.get(0);
+      ApplicationId appId = appReport.getApplicationId();
+      List<ApplicationAttemptReport> appAttempts =
+          yarnClient.getApplicationAttempts(appId);
+      if (appAttempts.isEmpty()) {
+        Thread.sleep(10);
+        continue;
+      }
+      ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
+      ContainerId amContainerId = appAttemptReport.getAMContainerId();
+
+      if (amContainerId == null) {
+        Thread.sleep(10);
+        continue;
+      }
+      ContainerReport report = yarnClient.getContainerReport(amContainerId);
+      Resource masterResource = report.getAllocatedResource();
+      Assert.assertEquals(masterMemory, masterResource.getMemorySize());
+      Assert.assertEquals(1, masterResource.getVirtualCores());
+
+      List<ContainerReport> containers =
+          yarnClient.getContainers(appAttemptReport.getApplicationAttemptId());
+      if (containers.size() < 2) {
+        Thread.sleep(10);
+        continue;
+      }
+      for (ContainerReport container : containers) {
+        if (!container.getContainerId().equals(amContainerId)) {
+          Resource containerResource = container.getAllocatedResource();
+          Assert.assertEquals(containerMemory,
+              containerResource.getMemorySize());
+          Assert.assertEquals(1, containerResource.getVirtualCores());
+        }
+      }
+
+      return;
+    }
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void testDistributedShellAMResourcesWithIllegalArguments()
+      throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_resources",
+        "memory-mb=invalid"
+    };
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    client.init(args);
+  }
+
+  @Test(expected=MissingArgumentException.class)
+  public void testDistributedShellAMResourcesWithMissingArgumentValue()
+      throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_resources"
+    };
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    client.init(args);
+  }
+
+  @Test(expected=ResourceNotFoundException.class)
+  public void testDistributedShellAMResourcesWithUnknownResource()
+      throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_resources",
+        "unknown-resource=5"
+    };
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    client.init(args);
+    client.run();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org