You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/01/08 06:29:37 UTC
hadoop git commit: YARN-7242. Support to specify values of different resource types in DistributedShell for easier testing. Contributed by Gergely Novák.
Repository: hadoop
Updated Branches:
refs/heads/trunk c2d6fa365 -> 01f3f2167
YARN-7242. Support to specify values of different resource types in DistributedShell for easier testing. Contributed by Gergely Novák.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/01f3f216
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/01f3f216
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/01f3f216
Branch: refs/heads/trunk
Commit: 01f3f2167ec20b52a18bc2cf250fb4229cfd2c14
Parents: c2d6fa3
Author: Sunil G <su...@apache.org>
Authored: Mon Jan 8 11:59:06 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon Jan 8 11:59:06 2018 +0530
----------------------------------------------------------------------
.../distributedshell/ApplicationMaster.java | 20 ++
.../applications/distributedshell/Client.java | 182 +++++++++++++++----
.../distributedshell/TestDistributedShell.java | 175 +++++++++++++++++-
3 files changed, 338 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/01f3f216/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index b3fa0ff..85496d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
@@ -244,6 +245,9 @@ public class ApplicationMaster {
// VirtualCores to request for the container on which the shell command will run
private static final int DEFAULT_CONTAINER_VCORES = 1;
private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
+ // All other resources to request for the container
+ // on which the shell command will run
+ private Map<String, Long> containerResources = new HashMap<>();
// Priority of the request
private int requestPriority;
// Execution type of the containers.
@@ -431,6 +435,10 @@ public class ApplicationMaster {
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command");
+ opts.addOption("container_resources", true,
+ "Amount of resources to be requested to run the shell command. " +
+ "Specified as resource type=value pairs separated by commas. " +
+ "E.g. -container_resources memory-mb=512,vcores=1");
opts.addOption("container_resource_profile", true,
"Resource profile to be requested to run the shell command");
opts.addOption("num_containers", true,
@@ -590,6 +598,14 @@ public class ApplicationMaster {
"container_memory", "-1"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
"container_vcores", "-1"));
+ containerResources = new HashMap<>();
+ if (cliParser.hasOption("container_resources")) {
+ Map<String, Long> resources = Client.parseResourcesString(
+ cliParser.getOptionValue("container_resources"));
+ for (Map.Entry<String, Long> entry : resources.entrySet()) {
+ containerResources.put(entry.getKey(), entry.getValue());
+ }
+ }
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
@@ -711,6 +727,7 @@ public class ApplicationMaster {
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
resourceProfiles = response.getResourceProfiles();
+ ResourceUtils.reinitializeResources(response.getResourceTypes());
// Dump out information about cluster capability as seen by the
// resource manager
long maxMem = response.getMaximumResourceCapability().getMemorySize();
@@ -1593,6 +1610,9 @@ public class ApplicationMaster {
containerVirtualCores;
resourceCapability.setMemorySize(containerMemory);
resourceCapability.setVirtualCores(containerVirtualCores);
+ for (Map.Entry<String, Long> entry : containerResources.entrySet()) {
+ resourceCapability.setResourceValue(entry.getKey(), entry.getValue());
+ }
}
String profileName = containerResourceProfile;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/01f3f216/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index e299acc..ef635d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.Vector;
import java.util.Arrays;
+import com.google.common.base.Joiner;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -70,7 +71,9 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -81,8 +84,11 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.slf4j.Logger;
@@ -144,7 +150,8 @@ public class Client {
private long amMemory = DEFAULT_AM_MEMORY;
// Amt. of virtual core resource to request for to run the App Master
private int amVCores = DEFAULT_AM_VCORES;
-
+ // Amount of resources to request to run the App Master
+ private Map<String, Long> amResources = new HashMap<>();
// AM resource profile
private String amResourceProfile = "";
@@ -168,6 +175,9 @@ public class Client {
private long containerMemory = DEFAULT_CONTAINER_MEMORY;
// Amt. of virtual cores to request for container in which shell script will be executed
private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
+ // Amt. of resources to request for container
+ // in which shell script will be executed
+ private Map<String, Long> containerResources = new HashMap<>();
// container resource profile
private String containerResourceProfile = "";
// No. of containers in which the shell script needs to be executed
@@ -265,6 +275,8 @@ public class Client {
Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
+ this.conf.setBoolean(
+ YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true);
this.appMasterMainClass = appMasterMainClass;
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
@@ -274,7 +286,12 @@ public class Client {
opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
opts.addOption("timeout", true, "Application timeout in milliseconds");
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
- opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
+ opts.addOption("master_vcores", true, "Amount of virtual cores " +
+ "to be requested to run the application master");
+ opts.addOption("master_resources", true, "Amount of resources " +
+ "to be requested to run the application master. " +
+ "Specified as resource type=value pairs separated by commas." +
+ "E.g. -master_resources memory-mb=512,vcores=2");
opts.addOption("jar", true, "Jar file containing the application master");
opts.addOption("master_resource_profile", true, "Resource profile for the application master");
opts.addOption("shell_command", true, "Shell command to be executed by " +
@@ -290,8 +307,14 @@ public class Client {
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_type", true,
"Container execution type, GUARANTEED or OPPORTUNISTIC");
- opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
- opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
+ opts.addOption("container_memory", true, "Amount of memory in MB " +
+ "to be requested to run the shell command");
+ opts.addOption("container_vcores", true, "Amount of virtual cores " +
+ "to be requested to run the shell command");
+ opts.addOption("container_resources", true, "Amount of resources " +
+ "to be requested to run the shell command. " +
+ "Specified as resource type=value pairs separated by commas. " +
+ "E.g. -container_resources memory-mb=256,vcores=1");
opts.addOption("container_resource_profile", true, "Resource profile for the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("promote_opportunistic_after_start", false,
@@ -403,6 +426,19 @@ public class Client {
Integer.parseInt(cliParser.getOptionValue("master_memory", "-1"));
amVCores =
Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1"));
+ if (cliParser.hasOption("master_resources")) {
+ Map<String, Long> masterResources =
+ parseResourcesString(cliParser.getOptionValue("master_resources"));
+ for (Map.Entry<String, Long> entry : masterResources.entrySet()) {
+ if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) {
+ amMemory = entry.getValue();
+ } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) {
+ amVCores = entry.getValue().intValue();
+ } else {
+ amResources.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
amResourceProfile = cliParser.getOptionValue("master_resource_profile", "");
if (!cliParser.hasOption("jar")) {
@@ -461,6 +497,19 @@ public class Client {
Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
containerVirtualCores =
Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1"));
+ if (cliParser.hasOption("container_resources")) {
+ Map<String, Long> resources =
+ parseResourcesString(cliParser.getOptionValue("container_resources"));
+ for (Map.Entry<String, Long> entry : resources.entrySet()) {
+ if (entry.getKey().equals(ResourceInformation.MEMORY_URI)) {
+ containerMemory = entry.getValue();
+ } else if (entry.getKey().equals(ResourceInformation.VCORES_URI)) {
+ containerVirtualCores = entry.getValue().intValue();
+ } else {
+ containerResources.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
containerResourceProfile =
cliParser.getOptionValue("container_resource_profile", "");
numContainers =
@@ -637,9 +686,9 @@ public class Client {
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
- setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile,
- amPriority, profiles);
- setContainerResources(containerMemory, containerVirtualCores, profiles);
+ List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo();
+ setAMResourceCapability(appContext, profiles, resourceTypes);
+ setContainerResources(profiles, resourceTypes);
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
@@ -776,6 +825,10 @@ public class Client {
if (containerVirtualCores > 0) {
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
}
+ if (!containerResources.isEmpty()) {
+ Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("=");
+ vargs.add("--container_resources " + joiner.join(containerResources));
+ }
if (containerResourceProfile != null && !containerResourceProfile
.isEmpty()) {
vargs.add("--container_resource_profile " + containerResourceProfile);
@@ -1011,25 +1064,26 @@ public class Client {
}
private void setAMResourceCapability(ApplicationSubmissionContext appContext,
- long memory, int vcores, String profile, int priority,
- Map<String, Resource> profiles) throws IllegalArgumentException {
- if (memory < -1 || memory == 0) {
+ Map<String, Resource> profiles, List<ResourceTypeInfo> resourceTypes)
+ throws IllegalArgumentException, IOException, YarnException {
+ if (amMemory < -1 || amMemory == 0) {
throw new IllegalArgumentException("Invalid memory specified for"
- + " application master, exiting. Specified memory=" + memory);
+ + " application master, exiting. Specified memory=" + amMemory);
}
- if (vcores < -1 || vcores == 0) {
+ if (amVCores < -1 || amVCores == 0) {
throw new IllegalArgumentException("Invalid virtual cores specified for"
- + " application master, exiting. Specified virtual cores=" + vcores);
+ + " application master, exiting. " +
+ "Specified virtual cores=" + amVCores);
}
- String tmp = profile;
- if (profile.isEmpty()) {
+ String tmp = amResourceProfile;
+ if (amResourceProfile.isEmpty()) {
tmp = "default";
}
if (appContext.getAMContainerResourceRequests() == null) {
List<ResourceRequest> amResourceRequests = new ArrayList<ResourceRequest>();
amResourceRequests
- .add(ResourceRequest.newInstance(Priority.newInstance(priority), "*",
- Resources.clone(Resources.none()), 1));
+ .add(ResourceRequest.newInstance(Priority.newInstance(amPriority),
+ "*", Resources.clone(Resources.none()), 1));
appContext.setAMContainerResourceRequests(amResourceRequests);
}
@@ -1038,36 +1092,90 @@ public class Client {
appContext.getAMContainerResourceRequests().get(0).setProfileCapability(
ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
}
+
Resource capability = Resource.newInstance(0, 0);
+
+ validateResourceTypes(amResources.keySet(), resourceTypes);
+ for (Map.Entry<String, Long> entry : amResources.entrySet()) {
+ capability.setResourceValue(entry.getKey(), entry.getValue());
+ }
// set amMemory because it's used to set Xmx param
- if (profiles == null) {
- amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory;
- amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores;
- capability.setMemorySize(amMemory);
- capability.setVirtualCores(amVCores);
- } else {
- amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory;
- amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores;
- capability.setMemorySize(memory);
- capability.setVirtualCores(vcores);
+ if (amMemory == -1) {
+ amMemory = (profiles == null) ? DEFAULT_AM_MEMORY :
+ profiles.get(tmp).getMemorySize();
+ }
+ if (amVCores == -1) {
+ amVCores = (profiles == null) ? DEFAULT_AM_VCORES :
+ profiles.get(tmp).getVirtualCores();
}
+ capability.setMemorySize(amMemory);
+ capability.setVirtualCores(amVCores);
appContext.getAMContainerResourceRequests().get(0).getProfileCapability()
.setProfileCapabilityOverride(capability);
}
- private void setContainerResources(long memory, int vcores,
- Map<String, Resource> profiles) throws IllegalArgumentException {
- if (memory < -1 || memory == 0) {
- throw new IllegalArgumentException(
- "Container memory '" + memory + "' has to be greated than 0");
+ private void setContainerResources(Map<String, Resource> profiles,
+ List<ResourceTypeInfo> resourceTypes) throws IllegalArgumentException {
+ if (containerMemory < -1 || containerMemory == 0) {
+ throw new IllegalArgumentException("Container memory '" +
+ containerMemory + "' has to be greated than 0");
}
- if (vcores < -1 || vcores == 0) {
- throw new IllegalArgumentException(
- "Container vcores '" + vcores + "' has to be greated than 0");
+ if (containerVirtualCores < -1 || containerVirtualCores == 0) {
+ throw new IllegalArgumentException("Container vcores '" +
+ containerVirtualCores + "' has to be greated than 0");
}
+ validateResourceTypes(containerResources.keySet(), resourceTypes);
if (profiles == null) {
- containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory;
- containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores;
+ containerMemory = containerMemory == -1 ?
+ DEFAULT_CONTAINER_MEMORY : containerMemory;
+ containerVirtualCores = containerVirtualCores == -1 ?
+ DEFAULT_CONTAINER_VCORES : containerVirtualCores;
+ }
+ }
+
+ private void validateResourceTypes(Iterable<String> resourceNames,
+ List<ResourceTypeInfo> resourceTypes) {
+ for (String resourceName : resourceNames) {
+ if (!resourceTypes.stream().anyMatch(e ->
+ e.getName().equals(resourceName))) {
+ throw new ResourceNotFoundException("Unknown resource: " +
+ resourceName);
+ }
+ }
+ }
+
+ static Map<String, Long> parseResourcesString(String resourcesStr) {
+ Map<String, Long> resources = new HashMap<>();
+
+ // Ignore the grouping "[]"
+ if (resourcesStr.startsWith("[")) {
+ resourcesStr = resourcesStr.substring(1);
+ }
+ if (resourcesStr.endsWith("]")) {
+ resourcesStr = resourcesStr.substring(0, resourcesStr.length());
+ }
+
+ for (String resource : resourcesStr.trim().split(",")) {
+ resource = resource.trim();
+ if (!resource.matches("^[^=]+=\\d+\\s?\\w*$")) {
+ throw new IllegalArgumentException("\"" + resource + "\" is not a " +
+ "valid resource type/amount pair. " +
+ "Please provide key=amount pairs separated by commas.");
+ }
+ String[] splits = resource.split("=");
+ String key = splits[0], value = splits[1];
+ String units = ResourceUtils.getUnits(value);
+ String valueWithoutUnit = value.substring(
+ 0, value.length() - units.length()).trim();
+ Long resourceValue = Long.valueOf(valueWithoutUnit);
+ if (!units.isEmpty()) {
+ resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
+ }
+ if (key.equals("memory")) {
+ key = ResourceInformation.MEMORY_URI;
+ }
+ resources.put(key, resourceValue);
}
+ return resources;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/01f3f216/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index d6bb8d6..667b60d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -54,11 +55,15 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -71,11 +76,11 @@ import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
@@ -89,6 +94,7 @@ import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -112,6 +118,7 @@ public class TestDistributedShell {
private static final int NUM_NMS = 1;
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+ private static final int MIN_ALLOCATION_MB = 128;
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
@@ -138,7 +145,8 @@ public class TestDistributedShell {
LOG.info("Starting up YARN cluster");
conf = new YarnConfiguration();
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ MIN_ALLOCATION_MB);
// reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
conf.set("yarn.log.dir", "target");
@@ -1433,4 +1441,167 @@ public class TestDistributedShell {
Assert.fail("Job execution with opportunistic containers failed.");
}
}
+
+ @Test
+ @TimelineVersion(2.0f)
+ public void testDistributedShellWithResources() throws Exception {
+ doTestDistributedShellWithResources(false);
+ }
+
+ @Test
+ @TimelineVersion(2.0f)
+ public void testDistributedShellWithResourcesWithLargeContainers()
+ throws Exception {
+ doTestDistributedShellWithResources(true);
+ }
+
+ public void doTestDistributedShellWithResources(boolean largeContainers)
+ throws Exception {
+ Resource clusterResource = yarnCluster.getResourceManager()
+ .getResourceScheduler().getClusterResource();
+ String masterMemoryString = "1 Gi";
+ String containerMemoryString = "512 Mi";
+ long masterMemory = 1024;
+ long containerMemory = 512;
+ Assume.assumeTrue("The cluster doesn't have enough memory for this test",
+ clusterResource.getMemorySize() >= masterMemory + containerMemory);
+ Assume.assumeTrue("The cluster doesn't have enough cores for this test",
+ clusterResource.getVirtualCores() >= 2);
+ if (largeContainers) {
+ masterMemory = clusterResource.getMemorySize() * 2 / 3;
+ masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB;
+ masterMemoryString = masterMemory + "Mi";
+ containerMemory = clusterResource.getMemorySize() / 3;
+ containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB;
+ containerMemoryString = String.valueOf(containerMemory);
+ }
+
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "2",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_resources",
+ "memory=" + masterMemoryString + ",vcores=1",
+ "--container_resources",
+ "memory=" + containerMemoryString + ",vcores=1",
+ };
+
+ LOG.info("Initializing DS Client");
+ Client client = new Client(new Configuration(yarnCluster.getConfig()));
+ Assert.assertTrue(client.init(args));
+ LOG.info("Running DS Client");
+ final AtomicBoolean result = new AtomicBoolean(false);
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ result.set(client.run());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ t.start();
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new Configuration(yarnCluster.getConfig()));
+ yarnClient.start();
+
+ while (true) {
+ List<ApplicationReport> apps = yarnClient.getApplications();
+ if (apps.isEmpty()) {
+ Thread.sleep(10);
+ continue;
+ }
+ ApplicationReport appReport = apps.get(0);
+ ApplicationId appId = appReport.getApplicationId();
+ List<ApplicationAttemptReport> appAttempts =
+ yarnClient.getApplicationAttempts(appId);
+ if (appAttempts.isEmpty()) {
+ Thread.sleep(10);
+ continue;
+ }
+ ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
+ ContainerId amContainerId = appAttemptReport.getAMContainerId();
+
+ if (amContainerId == null) {
+ Thread.sleep(10);
+ continue;
+ }
+ ContainerReport report = yarnClient.getContainerReport(amContainerId);
+ Resource masterResource = report.getAllocatedResource();
+ Assert.assertEquals(masterMemory, masterResource.getMemorySize());
+ Assert.assertEquals(1, masterResource.getVirtualCores());
+
+ List<ContainerReport> containers =
+ yarnClient.getContainers(appAttemptReport.getApplicationAttemptId());
+ if (containers.size() < 2) {
+ Thread.sleep(10);
+ continue;
+ }
+ for (ContainerReport container : containers) {
+ if (!container.getContainerId().equals(amContainerId)) {
+ Resource containerResource = container.getAllocatedResource();
+ Assert.assertEquals(containerMemory,
+ containerResource.getMemorySize());
+ Assert.assertEquals(1, containerResource.getVirtualCores());
+ }
+ }
+
+ return;
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ public void testDistributedShellAMResourcesWithIllegalArguments()
+ throws Exception {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_resources",
+ "memory-mb=invalid"
+ };
+ Client client = new Client(new Configuration(yarnCluster.getConfig()));
+ client.init(args);
+ }
+
+ @Test(expected=MissingArgumentException.class)
+ public void testDistributedShellAMResourcesWithMissingArgumentValue()
+ throws Exception {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_resources"
+ };
+ Client client = new Client(new Configuration(yarnCluster.getConfig()));
+ client.init(args);
+ }
+
+ @Test(expected=ResourceNotFoundException.class)
+ public void testDistributedShellAMResourcesWithUnknownResource()
+ throws Exception {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_resources",
+ "unknown-resource=5"
+ };
+ Client client = new Client(new Configuration(yarnCluster.getConfig()));
+ client.init(args);
+ client.run();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org