You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/30 22:59:03 UTC

[28/50] [abbrv] hadoop git commit: YARN-5588. [YARN-3926] Add support for resource profiles in distributed shell. Contributed by Varun Vasudev.

YARN-5588. [YARN-3926] Add support for resource profiles in distributed shell. Contributed by Varun Vasudev.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81e4c92d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81e4c92d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81e4c92d

Branch: refs/heads/YARN-3926
Commit: 81e4c92d48f418b72215d8fb3cea305238d9eb6c
Parents: 327a017
Author: Sunil G <su...@apache.org>
Authored: Mon Feb 27 21:44:14 2017 +0530
Committer: Wangda Tan <wa...@apache.org>
Committed: Wed Aug 30 15:49:01 2017 -0700

----------------------------------------------------------------------
 .../yarn/api/records/ProfileCapability.java     |  16 +-
 .../ResourceProfilesNotEnabledException.java    |  43 +++++
 .../distributedshell/ApplicationMaster.java     |  61 +++++--
 .../applications/distributedshell/Client.java   | 174 +++++++++++++++----
 .../distributedshell/TestDistributedShell.java  |  29 ++++
 .../yarn/client/api/impl/TestAMRMClient.java    |   2 +-
 .../server/resourcemanager/ClientRMService.java |   4 +-
 .../resource/ResourceProfilesManagerImpl.java   |   6 +-
 .../scheduler/ClusterNodeTracker.java           |  12 +-
 9 files changed, 288 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
index faaddd5..1a8d1c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
@@ -150,17 +150,21 @@ public abstract class ProfileCapability {
         .checkArgument(capability != null, "Capability cannot be null");
     Preconditions.checkArgument(resourceProfilesMap != null,
         "Resource profiles map cannot be null");
+    Resource none = Resource.newInstance(0, 0);
     Resource resource = Resource.newInstance(0, 0);
-
-    if (resourceProfilesMap.containsKey(capability.getProfileName())) {
-      resource = Resource
-          .newInstance(resourceProfilesMap.get(capability.getProfileName()));
+    String profileName = capability.getProfileName();
+    if (profileName.isEmpty()) {
+      profileName = DEFAULT_PROFILE;
+    }
+    if (resourceProfilesMap.containsKey(profileName)) {
+      resource = Resource.newInstance(resourceProfilesMap.get(profileName));
     }
 
-    if(capability.getProfileCapabilityOverride()!= null) {
+    if (capability.getProfileCapabilityOverride() != null &&
+        !capability.getProfileCapabilityOverride().equals(none)) {
       for (Map.Entry<String, ResourceInformation> entry : capability
           .getProfileCapabilityOverride().getResources().entrySet()) {
-        if (entry.getValue() != null && entry.getValue().getValue() != 0) {
+        if (entry.getValue() != null && entry.getValue().getValue() >= 0) {
           resource.setResourceInformation(entry.getKey(), entry.getValue());
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java
new file mode 100644
index 0000000..558e075
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceProfilesNotEnabledException.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+/**
+ * This exception is thrown when the client requests information about
+ * ResourceProfiles in the
+ * {@link org.apache.hadoop.yarn.api.ApplicationClientProtocol} but resource
+ * profiles is not enabled on the RM.
+ *
+ */
+public class ResourceProfilesNotEnabledException extends YarnException {
+
+  private static final long serialVersionUID = 13498237L;
+
+  public ResourceProfilesNotEnabledException(Throwable cause) {
+    super(cause);
+  }
+
+  public ResourceProfilesNotEnabledException(String message) {
+    super(message);
+  }
+
+  public ResourceProfilesNotEnabledException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index a02af70..c167862 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
@@ -103,6 +104,7 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.TimelineServiceHelper;
@@ -231,12 +233,18 @@ public class ApplicationMaster {
   @VisibleForTesting
   protected int numTotalContainers = 1;
   // Memory to request for the container on which the shell command will run
-  private long containerMemory = 10;
+  private static final long DEFAULT_CONTAINER_MEMORY = 10;
+  private long containerMemory = DEFAULT_CONTAINER_MEMORY;
   // VirtualCores to request for the container on which the shell command will run
-  private int containerVirtualCores = 1;
+  private static final int DEFAULT_CONTAINER_VCORES = 1;
+  private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
   // Priority of the request
   private int requestPriority;
 
+  // Resource profile for the container
+  private String containerResourceProfile = "";
+  Map<String, Resource> resourceProfiles;
+
   // Counter for completed containers ( complete denotes successful or failed )
   private AtomicInteger numCompletedContainers = new AtomicInteger();
   // Allocated container count so that we know how many containers has the RM
@@ -407,6 +415,8 @@ public class ApplicationMaster {
         "Amount of memory in MB to be requested to run the shell command");
     opts.addOption("container_vcores", true,
         "Amount of virtual cores to be requested to run the shell command");
+    opts.addOption("container_resource_profile", true,
+        "Resource profile to be requested to run the shell command");
     opts.addOption("num_containers", true,
         "No. of containers on which the shell command needs to be executed");
     opts.addOption("priority", true, "Application Priority. Default 0");
@@ -548,9 +558,11 @@ public class ApplicationMaster {
     }
 
     containerMemory = Integer.parseInt(cliParser.getOptionValue(
-        "container_memory", "10"));
+        "container_memory", "-1"));
     containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
-        "container_vcores", "1"));
+        "container_vcores", "-1"));
+    containerResourceProfile =
+        cliParser.getOptionValue("container_resource_profile", "");
     numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
         "num_containers", "1"));
     if (numTotalContainers == 0) {
@@ -669,6 +681,7 @@ public class ApplicationMaster {
     RegisterApplicationMasterResponse response = amRMClient
         .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
             appMasterTrackingUrl);
+    resourceProfiles = response.getResourceProfiles();
     // Dump out information about cluster capability as seen by the
     // resource manager
     long maxMem = response.getMaximumResourceCapability().getMemorySize();
@@ -1216,12 +1229,8 @@ public class ApplicationMaster {
     Priority pri = Priority.newInstance(requestPriority);
 
     // Set up resource type requirements
-    // For now, memory and CPU are supported so we set memory and cpu requirements
-    Resource capability = Resource.newInstance(containerMemory,
-      containerVirtualCores);
-
-    ContainerRequest request = new ContainerRequest(capability, null, null,
-        pri);
+    ContainerRequest request =
+        new ContainerRequest(createProfileCapability(), null, null, pri);
     LOG.info("Requested container ask: " + request.toString());
     return request;
   }
@@ -1485,4 +1494,36 @@ public class ApplicationMaster {
     }
   }
 
+  private ProfileCapability createProfileCapability()
+      throws YarnRuntimeException {
+    if (containerMemory < -1 || containerMemory == 0) {
+      throw new YarnRuntimeException("Value of AM memory '" + containerMemory
+          + "' has to be greater than 0");
+    }
+    if (containerVirtualCores < -1 || containerVirtualCores == 0) {
+      throw new YarnRuntimeException(
+          "Value of AM vcores '" + containerVirtualCores
+              + "' has to be greater than 0");
+    }
+
+    Resource resourceCapability =
+        Resource.newInstance(containerMemory, containerVirtualCores);
+    if (resourceProfiles == null) {
+      containerMemory = containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY :
+          containerMemory;
+      containerVirtualCores =
+          containerVirtualCores == -1 ? DEFAULT_CONTAINER_VCORES :
+              containerVirtualCores;
+      resourceCapability.setMemorySize(containerMemory);
+      resourceCapability.setVirtualCores(containerVirtualCores);
+    }
+
+    String profileName = containerResourceProfile;
+    if ("".equals(containerResourceProfile) && resourceProfiles != null) {
+      profileName = "default";
+    }
+    ProfileCapability capability =
+        ProfileCapability.newInstance(profileName, resourceCapability);
+    return capability;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index eedb501..0c6d2d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -66,10 +66,12 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -79,8 +81,9 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 /**
@@ -119,6 +122,11 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 public class Client {
 
   private static final Log LOG = LogFactory.getLog(Client.class);
+
+  private static final int DEFAULT_AM_MEMORY = 100;
+  private static final int DEFAULT_AM_VCORES = 1;
+  private static final int DEFAULT_CONTAINER_MEMORY = 10;
+  private static final int DEFAULT_CONTAINER_VCORES = 1;
   
   // Configuration
   private Configuration conf;
@@ -130,9 +138,12 @@ public class Client {
   // Queue for App master
   private String amQueue = "";
   // Amt. of memory resource to request for to run the App Master
-  private long amMemory = 100;
+  private long amMemory = DEFAULT_AM_MEMORY;
   // Amt. of virtual core resource to request for to run the App Master
-  private int amVCores = 1;
+  private int amVCores = DEFAULT_AM_VCORES;
+
+  // AM resource profile
+  private String amResourceProfile = "";
 
   // Application master jar file
   private String appMasterJar = ""; 
@@ -151,9 +162,11 @@ public class Client {
   private int shellCmdPriority = 0;
 
   // Amt of memory to request for container in which shell script will be executed
-  private int containerMemory = 10; 
+  private long containerMemory = DEFAULT_CONTAINER_MEMORY;
   // Amt. of virtual cores to request for container in which shell script will be executed
-  private int containerVirtualCores = 1;
+  private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
+  // container resource profile
+  private String containerResourceProfile = "";
   // No. of containers in which the shell script needs to be executed
   private int numContainers = 1;
   private String nodeLabelExpression = null;
@@ -256,6 +269,7 @@ public class Client {
     opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
     opts.addOption("master_vcores", true, "Amount of virtual cores to be requested to run the application master");
     opts.addOption("jar", true, "Jar file containing the application master");
+    opts.addOption("master_resource_profile", true, "Resource profile for the application master");
     opts.addOption("shell_command", true, "Shell command to be executed by " +
         "the Application Master. Can only specify either --shell_command " +
         "or --shell_script");
@@ -269,6 +283,7 @@ public class Client {
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
     opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
     opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
+    opts.addOption("container_resource_profile", true, "Resource profile for the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
     opts.addOption("keep_containers_across_application_attempts", false,
@@ -372,17 +387,11 @@ public class Client {
     appName = cliParser.getOptionValue("appname", "DistributedShell");
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");
-    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "100"));
-    amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
-
-    if (amMemory < 0) {
-      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
-          + " Specified memory=" + amMemory);
-    }
-    if (amVCores < 0) {
-      throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
-          + " Specified virtual cores=" + amVCores);
-    }
+    amMemory =
+        Integer.parseInt(cliParser.getOptionValue("master_memory", "-1"));
+    amVCores =
+        Integer.parseInt(cliParser.getOptionValue("master_vcores", "-1"));
+    amResourceProfile = cliParser.getOptionValue("master_resource_profile", "");
 
     if (!cliParser.hasOption("jar")) {
       throw new IllegalArgumentException("No jar file specified for application master");
@@ -423,17 +432,18 @@ public class Client {
     }
     shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
 
-    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
-    containerVirtualCores = Integer.parseInt(cliParser.getOptionValue("container_vcores", "1"));
-    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
-    
-
-    if (containerMemory < 0 || containerVirtualCores < 0 || numContainers < 1) {
-      throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
-          + " exiting."
-          + " Specified containerMemory=" + containerMemory
-          + ", containerVirtualCores=" + containerVirtualCores
-          + ", numContainer=" + numContainers);
+    containerMemory =
+        Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
+    containerVirtualCores =
+        Integer.parseInt(cliParser.getOptionValue("container_vcores", "-1"));
+    containerResourceProfile =
+        cliParser.getOptionValue("container_resource_profile", "");
+    numContainers =
+        Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+
+    if (numContainers < 1) {
+      throw new IllegalArgumentException("Invalid no. of containers specified,"
+          + " exiting. Specified numContainer=" + numContainers);
     }
     
     nodeLabelExpression = cliParser.getOptionValue("node_label_expression", null);
@@ -540,6 +550,32 @@ public class Client {
       prepareTimelineDomain();
     }
 
+    Map<String, Resource> profiles;
+    try {
+      profiles = yarnClient.getResourceProfiles();
+    } catch (ResourceProfilesNotEnabledException re) {
+      profiles = null;
+    }
+
+    List<String> appProfiles = new ArrayList<>(2);
+    appProfiles.add(amResourceProfile);
+    appProfiles.add(containerResourceProfile);
+    for (String appProfile : appProfiles) {
+      if (appProfile != null && !appProfile.isEmpty()) {
+        if (profiles == null) {
+          String message = "Resource profiles is not enabled";
+          LOG.error(message);
+          throw new IOException(message);
+        }
+        if (!profiles.containsKey(appProfile)) {
+          String message = "Unknown resource profile '" + appProfile
+              + "'. Valid resource profiles are " + profiles.keySet();
+          LOG.error(message);
+          throw new IOException(message);
+        }
+      }
+    }
+
     // Get a new application id
     YarnClientApplication app = yarnClient.createApplication();
     GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
@@ -573,6 +609,13 @@ public class Client {
     ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
 
+    // Set up resource type requirements
+    // For now, both memory and vcores are supported, so we set memory and
+    // vcores requirements
+    setAMResourceCapability(appContext, amMemory, amVCores, amResourceProfile,
+        amPriority, profiles);
+    setContainerResources(containerMemory, containerVirtualCores, profiles);
+
     appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
     appContext.setApplicationName(appName);
 
@@ -696,8 +739,16 @@ public class Client {
     // Set class name 
     vargs.add(appMasterMainClass);
     // Set params for Application Master
-    vargs.add("--container_memory " + String.valueOf(containerMemory));
-    vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+    if (containerMemory > 0) {
+      vargs.add("--container_memory " + String.valueOf(containerMemory));
+    }
+    if (containerVirtualCores > 0) {
+      vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+    }
+    if (containerResourceProfile != null && !containerResourceProfile
+        .isEmpty()) {
+      vargs.add("--container_resource_profile " + containerResourceProfile);
+    }
     vargs.add("--num_containers " + String.valueOf(numContainers));
     if (null != nodeLabelExpression) {
       appContext.setNodeLabelExpression(nodeLabelExpression);
@@ -730,12 +781,6 @@ public class Client {
     ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
       localResources, env, commands, null, null, null);
 
-    // Set up resource type requirements
-    // For now, both memory and vcores are supported, so we set memory and 
-    // vcores requirements
-    Resource capability = Resource.newInstance(amMemory, amVCores);
-    appContext.setResource(capability);
-
     // Service data is a binary blob that can be passed to the application
     // Not needed in this scenario
     // amContainer.setServiceData(serviceData);
@@ -933,4 +978,63 @@ public class Client {
       timelineClient.stop();
     }
   }
+
+  private void setAMResourceCapability(ApplicationSubmissionContext appContext,
+      long memory, int vcores, String profile, int priority,
+      Map<String, Resource> profiles) throws IllegalArgumentException {
+    if (memory < -1 || memory == 0) {
+      throw new IllegalArgumentException("Invalid memory specified for"
+          + " application master, exiting. Specified memory=" + memory);
+    }
+    if (vcores < -1 || vcores == 0) {
+      throw new IllegalArgumentException("Invalid virtual cores specified for"
+          + " application master, exiting. Specified virtual cores=" + vcores);
+    }
+    String tmp = profile;
+    if (profile.isEmpty()) {
+      tmp = "default";
+    }
+    if (appContext.getAMContainerResourceRequest() == null) {
+      appContext.setAMContainerResourceRequest(ResourceRequest
+          .newInstance(Priority.newInstance(priority), "*",
+              Resources.clone(Resources.none()), 1));
+    }
+
+    if (appContext.getAMContainerResourceRequest().getProfileCapability()
+        == null) {
+      appContext.getAMContainerResourceRequest().setProfileCapability(
+          ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0)));
+    }
+    Resource capability = Resource.newInstance(0, 0);
+    // set amMemory because it's used to set Xmx param
+    if (profiles == null) {
+      amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory;
+      amVCores = vcores == -1 ? DEFAULT_AM_VCORES : vcores;
+      capability.setMemorySize(amMemory);
+      capability.setVirtualCores(amVCores);
+    } else {
+      amMemory = memory == -1 ? profiles.get(tmp).getMemorySize() : memory;
+      amVCores = vcores == -1 ? profiles.get(tmp).getVirtualCores() : vcores;
+      capability.setMemorySize(memory);
+      capability.setVirtualCores(vcores);
+    }
+    appContext.getAMContainerResourceRequest().getProfileCapability()
+        .setProfileCapabilityOverride(capability);
+  }
+
+  private void setContainerResources(long memory, int vcores,
+      Map<String, Resource> profiles) throws IllegalArgumentException {
+    if (memory < -1 || memory == 0) {
+      throw new IllegalArgumentException(
+          "Container memory '" + memory + "' has to be greated than 0");
+    }
+    if (vcores < -1 || vcores == 0) {
+      throw new IllegalArgumentException(
+          "Container vcores '" + vcores + "' has to be greated than 0");
+    }
+    if (profiles == null) {
+      containerMemory = memory == -1 ? DEFAULT_CONTAINER_MEMORY : memory;
+      containerVirtualCores = vcores == -1 ? DEFAULT_CONTAINER_VCORES : vcores;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index fc270cb..b541cae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -1122,6 +1122,7 @@ public class TestDistributedShell {
           "1"
       };
       client.init(args);
+      client.run();
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
       Assert.assertTrue("The throw exception is not expected",
@@ -1349,4 +1350,32 @@ public class TestDistributedShell {
     }
     return numOfWords;
   }
+
+  @Test
+  public void testDistributedShellResourceProfiles() throws Exception {
+    String[][] args = {
+        {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
+            Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
+            "maximum" },
+        {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
+            Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
+            "default" },
+        {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command",
+            Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
+            "default", "--container_resource_profile", "maximum" }
+        };
+
+    for (int i = 0; i < args.length; ++i) {
+      LOG.info("Initializing DS Client");
+      Client client = new Client(new Configuration(yarnCluster.getConfig()));
+      Assert.assertTrue(client.init(args[i]));
+      LOG.info("Running DS Client");
+      try {
+        client.run();
+        Assert.fail("Client run should throw error");
+      } catch (Exception e) {
+        continue;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 662271a..1de7bc2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -534,7 +534,7 @@ public class TestAMRMClient {
                   List<? extends Collection<ContainerRequest>> matches,
                   int matchSize) {
     assertEquals(1, matches.size());
-    assertEquals(matches.get(0).size(), matchSize);
+    assertEquals(matchSize, matches.get(0).size());
   }
   
   @Test (timeout=60000)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index faabe4d..a929c9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -141,6 +141,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
+import org.apache.hadoop.yarn.exceptions.ResourceProfilesNotEnabledException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -1800,7 +1801,8 @@ public class ClientRMService extends AbstractService implements
         .getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
             YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
     if (!resourceProfilesEnabled) {
-      throw new YarnException("Resource profiles are not enabled");
+      throw new ResourceProfilesNotEnabledException(
+          "Resource profiles are not enabled");
     }
     return resourceProfilesManager.getResourceProfiles();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
index 15479e0..8839bf9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java
@@ -49,9 +49,9 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
   private static final String MEMORY = ResourceInformation.MEMORY_MB.getName();
   private static final String VCORES = ResourceInformation.VCORES.getName();
 
-  private static final String DEFAULT_PROFILE = "default";
-  private static final String MINIMUM_PROFILE = "minimum";
-  private static final String MAXIMUM_PROFILE = "maximum";
+  public static final String DEFAULT_PROFILE = "default";
+  public static final String MINIMUM_PROFILE = "minimum";
+  public static final String MAXIMUM_PROFILE = "maximum";
 
   private static final String[] MANDATORY_PROFILES =
       { DEFAULT_PROFILE, MINIMUM_PROFILE, MAXIMUM_PROFILE };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81e4c92d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 0449c35..ccec6bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -219,9 +219,15 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         return configuredMaxAllocation;
       }
 
-      return Resources.createResource(
-          Math.min(configuredMaxAllocation.getMemorySize(), maxNodeMemory),
-          Math.min(configuredMaxAllocation.getVirtualCores(), maxNodeVCores));
+      Resource ret = Resources.clone(configuredMaxAllocation);
+      if (ret.getMemorySize() > maxNodeMemory) {
+        ret.setMemorySize(maxNodeMemory);
+      }
+      if (ret.getVirtualCores() > maxNodeVCores) {
+        ret.setVirtualCores(maxNodeVCores);
+      }
+
+      return ret;
     } finally {
       readLock.unlock();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org