You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/22 00:17:54 UTC
[24/45] hadoop git commit: YARN-5588. [YARN-3926] Add support for
resource profiles in distributed shell. Contributed by Varun Vasudev.
YARN-5588. [YARN-3926] Add support for resource profiles in distributed shell. Contributed by Varun Vasudev.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b6800253
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b6800253
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b6800253
Branch: refs/heads/YARN-3926
Commit: b6800253120388333028674f6252f660f8e4fc82
Parents: c9e1ed8
Author: Sunil G <su...@apache.org>
Authored: Mon Feb 27 21:44:14 2017 +0530
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Aug 21 16:52:55 2017 -0700
----------------------------------------------------------------------
.../yarn/api/records/ProfileCapability.java | 16 +-
.../ResourceProfilesNotEnabledException.java | 43 +++++
.../distributedshell/ApplicationMaster.java | 61 +++++--
.../applications/distributedshell/Client.java | 174 +++++++++++++++----
.../distributedshell/TestDistributedShell.java | 29 ++++
.../yarn/client/api/impl/TestAMRMClient.java | 2 +-
.../server/resourcemanager/ClientRMService.java | 4 +-
.../resource/ResourceProfilesManagerImpl.java | 6 +-
.../scheduler/ClusterNodeTracker.java | 12 +-
9 files changed, 288 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
index faaddd5..1a8d1c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
@@ -150,17 +150,21 @@ public abstract class ProfileCapability {
.checkArgument(capability != null, "Capability cannot be null");
Preconditions.checkArgument(resourceProfilesMap != null,
"Resource profiles map cannot be null");
+ Resource none = Resource.newInstance(0, 0);
Resource resource = Resource.newInstance(0, 0);
-
- if (resourceProfilesMap.containsKey(capability.getProfileName())) {
- resource = Resource
- .newInstance(resourceProfilesMap.get(capability.getProfileName()));
+ String profileName = capability.getProfileName();
+ if (profileName.isEmpty()) {
+ profileName = DEFAULT_PROFILE;
+ }
+ if (resourceProfilesMap.containsKey(profileName)) {
+ resource = Resource.newInstance(resourceProfilesMap.get(profileName));
}
- if(capability.getProfileCapabilityOverride()!= null) {
+ if (capability.getProfileCapabilityOverride() != null &&
+ !capability.getProfileCapabilityOverride().equals(none)) {
for (Map.Entry<String, ResourceInformation> entry : capability
.getProfileCapabilityOverride().getResources().entrySet()) {
- if (entry.getValue() != null && entry.getValue().getValue() != 0) {
+ if (entry.getValue() != null && entry.getValue().getValue() >= 0) {
resource.setResourceInformation(entry.getKey(), entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java
new file mode 100644
index 0000000..558e075
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+/**
+ * This exception is thrown when the client requests information about
+ * ResourceProfiles in the
+ * {@link org.apache.hadoop.yarn.api.ApplicationClientProtocol} but resource
+ * profiles is not enabled on the RM.
+ *
+ */
+public class ResourceProfilesNotEnabledException extends YarnException {
+
+ private static final long serialVersionUID = 13498237L;
+
+ public ResourceProfilesNotEnabledException(Throwable cause) {
+ super(cause);
+ }
+
+ public ResourceProfilesNotEnabledException(String message) {
+ super(message);
+ }
+
+ public ResourceProfilesNotEnabledException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index ab4607a..7bddb41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
@@ -103,6 +104,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.LogManager;
@@ -229,12 +231,18 @@ public class ApplicationMaster {
@VisibleForTesting
protected int numTotalContainers = 1;
// Memory to request for the container on which the shell command will run
- private long containerMemory = 10;
+ private static final long DEFAULT_CONTAINER_MEMORY = 10;
+ private long containerMemory = DEFAULT_CONTAINER_MEMORY;
// VirtualCores to request for the container on which the shell command will run
- private int containerVirtualCores = 1;
+ private static final int DEFAULT_CONTAINER_VCORES = 1;
+ private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
// Priority of the request
private int requestPriority;
+ // Resource profile for the container
+ private String containerResourceProfile = "";
+ Map<String, Resource> resourceProfiles;
+
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
@@ -394,6 +402,8 @@ public class ApplicationMaster {
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true,
"Amount of virtual cores to be requested to run the shell command");
+ opts.addOption("container_resource_profile", true,
+ "Resource profile to be requested to run the shell command");
opts.addOption("num_containers", true,
"No. of containers on which the shell command needs to be executed");
opts.addOption("priority", true, "Application Priority. Default 0");
@@ -535,9 +545,11 @@ public class ApplicationMaster {
}
containerMemory = Integer.parseInt(cliParser.getOptionValue(
- "container_memory", "10"));
+ "container_memory", "-1"));
containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
- "container_vcores", "1"));
+ "container_vcores", "-1"));
+ containerResourceProfile =
+ cliParser.getOptionValue("container_resource_profile", "");
numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
"num_containers", "1"));
if (numTotalContainers == 0) {
@@ -656,6 +668,7 @@ public class ApplicationMaster {
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl);
+ resourceProfiles = response.getResourceProfiles();
// Dump out information about cluster capability as seen by the
// resource manager
long maxMem = response.getMaximumResourceCapability().getMemorySize();
@@ -1193,12 +1206,8 @@ public class ApplicationMaster {
Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements
- // For now, memory and CPU are supported so we set memory and cpu requirements
- Resource capability = Resource.newInstance(containerMemory,
- containerVirtualCores);
-
- ContainerRequest request = new ContainerRequest(capability, null, null,
- pri);
+ ContainerRequest request =
+ new ContainerRequest(createProfileCapability(), null, null, pri);
LOG.info("Requested container ask: " + request.toString());
return request;
}
@@ -1459,4 +1468,36 @@ public class ApplicationMaster {
}
}
+ private ProfileCapability createProfileCapability()
+ throws YarnRuntimeException {
+ if (containerMemory < -1 || containerMemory == 0) {
+ throw new YarnRuntimeException("Value of AM memory '" + containerMemory
+ + "' has to be greater than 0");
+ }
+ if (containerVirtualCores < -1 || containerVirtualCores == 0) {
+ throw new YarnRuntimeException(
+ "Value of AM vcores '" + containerVirtualCores
+ + "' has to be greater than 0");
+ }
+
+ Resource resourceCapability =
+ Resource.newInstance(containerMemory, containerVirtualCores);
+ if (resourceProfiles == null) {
+ containerMemory = containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY :
+ containerMemory;
+ containerVirtualCores =
+ containerVirtualCores == -1 ? DEFAULT_CONTAINER_VCORES :
+ containerVirtualCores;
+ resourceCapability.setMemorySize(containerMemory);
+ resourceCapability.setVirtualCores(containerVirtualCores);
+ }
+
+ String profileName = containerResourceProfile;
+ if ("".equals(containerResourceProfile) && resourceProfiles != null) {
+ profileName = "default";
+ }
+ ProfileCapability capability =
+ ProfileCapability.newInstance(profileName, resourceCapability);
+ return capability;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index eedb501..0c6d2d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -66,10 +66,12 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -79,8 +81,9 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
@@ -119,6 +122,11 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
public class Client {
private static final Log LOG = LogFactory.getLog(Client.class);
+
+ private static final int DEFAULT_AM_MEMORY = 100;
+ private static final int DEFAULT_AM_VCORES = 1;
+ private static final int DEFAULT_CONTAINER_MEMORY = 10;
+ private static final int DEFAULT_CONTAINER_VCORES = 1;
// Configuration
private Configuration conf;
@@ -130,9 +138,12 @@ public class Client {
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
- private long amMemory = 100;
+ private long amMemory = DEFAULT_AM_MEMORY;
// Amt. of virtual core resource to request for to run the App Master
- private int amVCores = 1;
+ private int amVCores = DEFAULT_AM_VCORES;
+
+ // AM resource profile
+ private String amResourceProfile = "";
// Application master jar file
private String appMasterJar = "";
@@ -151,9 +162,11 @@ public class Client {
private int shellCmdPriority = 0;
// Amt of memory to request for container in which shell script will be executed
- private int containerMemory = 10;
+ private long containerMemory = DEFAULT_CONTAINER_MEMORY;
// Amt. of virtual cores to request for container in which shell script will be executed
- private int containerVirtualCores = 1;
+ private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
+ // container resource profile
+ private String containerResourceProfile = "";
// No. of containers in which the shell script needs to be executed
private int numContainers = 1;
private String nodeLabelExpression = null;
@@ -256,6 +269,7 @@ public class Client {
opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
opts.addOption("jar", true, "Jar file containing the application master");
+ opts.addOption("master_resource_profile", true, "Resource profile for the application master");
opts.addOption("shell_command", true, "Shell command to be executed by " +
"the Application Master. Can only specify either --shell_command " +
"or --shell_script");
@@ -269,6 +283,7 @@ public class Client {
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
+ opts.addOption("container_resource_profile", true, "Resource profile for the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
opts.addOption("keep_containers_across_application_attempts", false,
@@ -372,17 +387,11 @@ public class Client {
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
- amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100"));
- amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
-
- if (amMemory < 0) {
- throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
- + " Specified memory=" + amMemory);
- }
- if (amVCores < 0) {
- throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
- + " Specified virtual cores=" + amVCores);
- }
+ amMemory =
+ Integer.parseInt(cliParser.getOptionValue("master_memory", "-1"));
+ amVCores =
+ Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1"));
+ amResourceProfile = cliParser.getOptionValue("master_resource_profile", "");
if (!cliParser.hasOption("jar")) {
throw new IllegalArgumentException("No jar file specified for application master");
@@ -423,17 +432,18 @@ public class Client {
}
shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
- containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
- containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
- numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
-
-
- if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
- throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
- + " exiting."
- + " Specified containerMemory=" + containerMemory
- + ", containerVirtualCores=" + containerVirtualCores
- + ", numContainer=" + numContainers);
+ containerMemory =
+ Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
+ containerVirtualCores =
+ Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1"));
+ containerResourceProfile =
+ cliParser.getOptionValue("container_resource_profile", "");
+ numContainers =
+ Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+
+ if (numContainers < 1) {
+ throw new IllegalArgumentException("Invalid no. of containers specified,"
+ + " exiting. Specified numContainer=" + numContainers);
}
nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
@@ -540,6 +550,32 @@ public class Client {
prepareTimelineDomain();
}
+ Map<String, Resource> profiles;
+ try {
+ profiles = yarnClient.getResourceProfiles();
+ } catch (ResourceProfilesNotEnabledException re) {
+ profiles = null;
+ }
+
+ List<String> appProfiles = new ArrayList<>(2);
+ appProfiles.add(amResourceProfile);
+ appProfiles.add(containerResourceProfile);
+ for (String appProfile : appProfiles) {
+ if (appProfile != null && !appProfile.isEmpty()) {
+ if (profiles == null) {
+ String message = "Resource profiles is not enabled";
+ LOG.error(message);
+ throw new IOException(message);
+ }
+ if (!profiles.containsKey(appProfile)) {
+ String message = "Unknown resource profile '" + appProfile
+ + "'. Valid resource profiles are " + profiles.keySet();
+ LOG.error(message);
+ throw new IOException(message);
+ }
+ }
+ }
+
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
@@ -573,6 +609,13 @@ public class Client {
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
+ // Set up resource type requirements
+ // For now, both memory and vcores are supported, so we set memory and
+ // vcores requirements
+ setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile,
+ amPriority, profiles);
+ setContainerResources(containerMemory, containerVirtualCores, profiles);
+
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
@@ -696,8 +739,16 @@ public class Client {
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
- vargs.add("--container_memory " + String.valueOf(containerMemory));
- vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+ if (containerMemory > 0) {
+ vargs.add("--container_memory " + String.valueOf(containerMemory));
+ }
+ if (containerVirtualCores > 0) {
+ vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+ }
+ if (containerResourceProfile != null && !containerResourceProfile
+ .isEmpty()) {
+ vargs.add("--container_resource_profile " + containerResourceProfile);
+ }
vargs.add("--num_containers " + String.valueOf(numContainers));
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
@@ -730,12 +781,6 @@ public class Client {
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
- // Set up resource type requirements
- // For now, both memory and vcores are supported, so we set memory and
- // vcores requirements
- Resource capability = Resource.newInstance(amMemory, amVCores);
- appContext.setResource(capability);
-
// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
@@ -933,4 +978,63 @@ public class Client {
timelineClient.stop();
}
}
+
+ private void setAMResourceCapability(ApplicationSubmissionContext appContext,
+ long memory, int vcores, String profile, int priority,
+ Map<String, Resource> profiles) throws IllegalArgumentException {
+ if (memory < -1 || memory == 0) {
+ throw new IllegalArgumentException("Invalid memory specified for"
+ + " application master, exiting. Specified memory=" + memory);
+ }
+ if (vcores < -1 || vcores == 0) {
+ throw new IllegalArgumentException("Invalid virtual cores specified for"
+ + " application master, exiting. Specified virtual cores=" + vcores);
+ }
+ String tmp = profile;
+ if (profile.isEmpty()) {
+ tmp = "default";
+ }
+ if (appContext.getAMContainerResourceRequest() == null) {
+ appContext.setAMContainerResourceRequest(ResourceRequest
+ .newInstance(Priority.newInstance(priority), "*",
+ Resources.clone(Resources.none()), 1));
+ }
+
+ if (appContext.getAMContainerResourceRequest().getProfileCapability()
+ == null) {
+ appContext.getAMContainerResourceRequest().setProfileCapability(
+ ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
+ }
+ Resource capability = Resource.newInstance(0, 0);
+ // set amMemory because it's used to set Xmx param
+ if (profiles == null) {
+ amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory;
+ amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores;
+ capability.setMemorySize(amMemory);
+ capability.setVirtualCores(amVCores);
+ } else {
+ amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory;
+ amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores;
+ capability.setMemorySize(memory);
+ capability.setVirtualCores(vcores);
+ }
+ appContext.getAMContainerResourceRequest().getProfileCapability()
+ .setProfileCapabilityOverride(capability);
+ }
+
+ private void setContainerResources(long memory, int vcores,
+ Map<String, Resource> profiles) throws IllegalArgumentException {
+ if (memory < -1 || memory == 0) {
+ throw new IllegalArgumentException(
+ "Container memory '" + memory + "' has to be greated than 0");
+ }
+ if (vcores < -1 || vcores == 0) {
+ throw new IllegalArgumentException(
+ "Container vcores '" + vcores + "' has to be greated than 0");
+ }
+ if (profiles == null) {
+ containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory;
+ containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index ef21c87..5074c79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -1099,6 +1099,7 @@ public class TestDistributedShell {
"1"
};
client.init(args);
+ client.run();
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
@@ -1326,4 +1327,32 @@ public class TestDistributedShell {
}
return numOfWords;
}
+
+ @Test
+ public void testDistributedShellResourceProfiles() throws Exception {
+ String[][] args = {
+ {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
+ "maximum" },
+ {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
+ "default" },
+ {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
+ "default", "--container_resource_profile", "maximum" }
+ };
+
+ for (int i = 0; i < args.length; ++i) {
+ LOG.info("Initializing DS Client");
+ Client client = new Client(new Configuration(yarnCluster.getConfig()));
+ Assert.assertTrue(client.init(args[i]));
+ LOG.info("Running DS Client");
+ try {
+ client.run();
+ Assert.fail("Client run should throw error");
+ } catch (Exception e) {
+ continue;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 662271a..1de7bc2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -534,7 +534,7 @@ public class TestAMRMClient {
List<? extends Collection<ContainerRequest>> matches,
int matchSize) {
assertEquals(1, matches.size());
- assertEquals(matches.get(0).size(), matchSize);
+ assertEquals(matchSize, matches.get(0).size());
}
@Test (timeout=60000)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index e352111..4112624 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -140,6 +140,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -1796,7 +1797,8 @@ public class ClientRMService extends AbstractService implements
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
if (!resourceProfilesEnabled) {
- throw new YarnException("Resource profiles are not enabled");
+ throw new ResourceProfilesNotEnabledException(
+ "Resource profiles are not enabled");
}
return resourceProfilesManager.getResourceProfiles();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
index 15479e0..8839bf9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
@@ -49,9 +49,9 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
private static final String VCORES = ResourceInformation.VCORES.getName();
- private static final String DEFAULT_PROFILE = "default";
- private static final String MINIMUM_PROFILE = "minimum";
- private static final String MAXIMUM_PROFILE = "maximum";
+ public static final String DEFAULT_PROFILE = "default";
+ public static final String MINIMUM_PROFILE = "minimum";
+ public static final String MAXIMUM_PROFILE = "maximum";
private static final String[] MANDATORY_PROFILES =
{ DEFAULT_PROFILE, MINIMUM_PROFILE, MAXIMUM_PROFILE };
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b6800253/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 0449c35..ccec6bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -219,9 +219,15 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
return configuredMaxAllocation;
}
- return Resources.createResource(
- Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
- Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores));
+ Resource ret = Resources.clone(configuredMaxAllocation);
+ if (ret.getMemorySize() > maxNodeMemory) {
+ ret.setMemorySize(maxNodeMemory);
+ }
+ if (ret.getVirtualCores() > maxNodeVCores) {
+ ret.setVirtualCores(maxNodeVCores);
+ }
+
+ return ret;
} finally {
readLock.unlock();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org