You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/12/13 20:16:09 UTC

[13/13] hadoop git commit: YARN-5587. Add support for resource profiles. (vvasudev via asuresh)

YARN-5587. Add support for resource profiles. (vvasudev via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb6e4f08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb6e4f08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb6e4f08

Branch: refs/heads/YARN-5501
Commit: bb6e4f08efe44024a551dbb19b02320ca56e7ac2
Parents: 0a30b9c
Author: Arun Suresh <as...@apache.org>
Authored: Tue Nov 15 01:01:07 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Tue Dec 13 12:10:33 2016 -0800

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   4 +
 .../RegisterApplicationMasterResponse.java      |   8 +
 .../yarn/api/records/ProfileCapability.java     |  94 ++++++++++-
 .../hadoop/yarn/api/records/Resource.java       |  14 ++
 .../yarn/api/records/ResourceInformation.java   |  57 ++++++-
 .../yarn/api/records/ResourceRequest.java       |  43 ++++-
 .../hadoop-yarn/hadoop-yarn-client/pom.xml      |   1 +
 .../hadoop/yarn/client/api/AMRMClient.java      | 117 +++++++++++++-
 .../yarn/client/api/impl/AMRMClientImpl.java    | 152 ++++++++++-------
 .../client/api/impl/RemoteRequestsTable.java    | 109 +++++++++----
 .../yarn/client/api/impl/TestAMRMClient.java    | 147 +++++++++++++++--
 .../impl/TestAMRMClientContainerRequest.java    |   8 +-
 .../api/impl/TestDistributedScheduling.java     |  12 +-
 .../yarn/client/api/impl/TestNMClient.java      |   5 +-
 .../TestOpportunisticContainerAllocation.java   |  15 +-
 .../src/test/resources/resource-profiles.json   |  18 +++
 ...RegisterApplicationMasterResponsePBImpl.java |  58 +++++++
 .../api/records/impl/pb/ResourcePBImpl.java     |   4 +-
 .../records/impl/pb/ResourceRequestPBImpl.java  |  41 ++++-
 .../yarn/util/resource/ResourceUtils.java       | 161 ++++++++++++++++++-
 .../hadoop/yarn/util/resource/Resources.java    |  10 +-
 .../ApplicationMasterService.java               |  12 ++
 .../server/resourcemanager/RMServerUtils.java   |  50 ++++++
 .../resource/ResourceProfilesManagerImpl.java   |   4 +
 .../scheduler/AbstractYarnScheduler.java        |  44 +++++
 .../scheduler/ClusterNodeTracker.java           |   3 +-
 .../scheduler/SchedulerUtils.java               |  10 ++
 .../scheduler/capacity/CapacityScheduler.java   |   4 +-
 .../scheduler/fair/FairScheduler.java           |   4 +-
 .../scheduler/fifo/FifoScheduler.java           |  13 +-
 .../yarn/server/resourcemanager/MockRM.java     |   2 +
 .../server/resourcemanager/TestAppManager.java  |   1 +
 .../TestApplicationMasterService.java           |  35 ++++
 .../scheduler/fair/TestFairScheduler.java       |   4 +
 .../hadoop/yarn/server/MiniYARNCluster.java     |   2 +
 35 files changed, 1102 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index ab36a4e..de58251 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -151,6 +151,10 @@
     <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
   </Match>
   <Match>
+    <Class name="org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl$ProfileCapabilityComparator" />
+    <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+  </Match>
+  <Match>
     <Class name="org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl" />
     <Field name="builder" />
     <Bug pattern="SE_BAD_FIELD" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
index 0b886dd..8fa8563 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java
@@ -204,4 +204,12 @@ public abstract class RegisterApplicationMasterResponse {
   @Unstable
   public abstract void setSchedulerResourceTypes(
       EnumSet<SchedulerResourceTypes> types);
+
+  @Public
+  @Unstable
+  public abstract Map<String, Resource> getResourceProfiles();
+
+  @Private
+  @Unstable
+  public abstract void setResourceProfiles(Map<String, Resource> profiles);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
index 0a93b89..faaddd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java
@@ -18,41 +18,93 @@
 
 package org.apache.hadoop.yarn.api.records;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.util.Records;
 
+import java.util.Map;
+
 /**
  * Class to capture capability requirements when using resource profiles. The
  * ProfileCapability is meant to be used as part of the ResourceRequest. A
  * profile capability has two pieces - the resource profile name and the
  * overrides. The resource profile specifies the name of the resource profile
  * to be used and the capability override is the overrides desired on specific
- * resource types. For example, you could use the "minimum" profile and set the
- * memory in the capability override to 4096M. This implies that you wish for
- * the resources specified in the "minimum" profile but with 4096M memory. The
- * conversion from the ProfileCapability to the Resource class with the actual
- * resource requirements will be done by the ResourceManager, which has the
- * actual profile to Resource mapping.
+ * resource types.
+ *
+ * For example, if you have a resource profile "small" that maps to
+ * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
+ * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
+ * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
+ *
+ * Note that the conversion from the ProfileCapability to the Resource class
+ * with the actual resource requirements will be done by the ResourceManager,
+ * which has the actual profile to Resource mapping.
+ *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
 public abstract class ProfileCapability {
 
+  public static final String DEFAULT_PROFILE = "default";
+
+  public static ProfileCapability newInstance(Resource override) {
+    return newInstance(DEFAULT_PROFILE, override);
+  }
+
+  public static ProfileCapability newInstance(String profile) {
+    Preconditions
+        .checkArgument(profile != null, "The profile name cannot be null");
+    ProfileCapability obj = Records.newRecord(ProfileCapability.class);
+    obj.setProfileName(profile);
+    obj.setProfileCapabilityOverride(Resource.newInstance(0, 0));
+    return obj;
+  }
+
   public static ProfileCapability newInstance(String profile,
       Resource override) {
+    Preconditions
+        .checkArgument(profile != null, "The profile name cannot be null");
     ProfileCapability obj = Records.newRecord(ProfileCapability.class);
     obj.setProfileName(profile);
     obj.setProfileCapabilityOverride(override);
     return obj;
   }
 
+  /**
+   * Get the profile name.
+   * @return the profile name
+   */
   public abstract String getProfileName();
 
+  /**
+   * Get the profile capability override.
+   * @return Resource object containing the override.
+   */
   public abstract Resource getProfileCapabilityOverride();
 
+  /**
+   * Set the resource profile name.
+   * @param profileName the resource profile name
+   */
   public abstract void setProfileName(String profileName);
 
+  /**
+   * Set the capability override to override specific resource types on the
+   * resource profile.
+   *
+   * For example, if you have a resource profile "small" that maps to
+   * {@literal <4096M, 2 cores, 1 gpu>} and you set the capability override to
+   * {@literal <8192M, 0 cores, 0 gpu>}, then the actual resource allocation on
+   * the ResourceManager will be {@literal <8192M, 2 cores, 1 gpu>}.
+   *
+   * Note that the conversion from the ProfileCapability to the Resource class
+   * with the actual resource requirements will be done by the ResourceManager,
+   * which has the actual profile to Resource mapping.
+   *
+   * @param r Resource object containing the capability override
+   */
   public abstract void setProfileCapabilityOverride(Resource r);
 
   @Override
@@ -85,4 +137,34 @@ public abstract class ProfileCapability {
     return "{ profile: " + this.getProfileName() + ", capabilityOverride: "
         + this.getProfileCapabilityOverride() + " }";
   }
+
+  /**
+   * Get a representation of the capability as a Resource object.
+   * @param capability the capability we wish to convert
+   * @param resourceProfilesMap map of profile name to Resource object
+   * @return Resource object representing the capability
+   */
+  public static Resource toResource(ProfileCapability capability,
+      Map<String, Resource> resourceProfilesMap) {
+    Preconditions
+        .checkArgument(capability != null, "Capability cannot be null");
+    Preconditions.checkArgument(resourceProfilesMap != null,
+        "Resource profiles map cannot be null");
+    Resource resource = Resource.newInstance(0, 0);
+
+    if (resourceProfilesMap.containsKey(capability.getProfileName())) {
+      resource = Resource
+          .newInstance(resourceProfilesMap.get(capability.getProfileName()));
+    }
+
+    if(capability.getProfileCapabilityOverride()!= null) {
+      for (Map.Entry<String, ResourceInformation> entry : capability
+          .getProfileCapabilityOverride().getResources().entrySet()) {
+        if (entry.getValue() != null && entry.getValue().getValue() != 0) {
+          resource.setResourceInformation(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    return resource;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 4bbca5a..103bcfd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.yarn.api.records;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -74,6 +76,18 @@ public abstract class Resource implements Comparable<Resource> {
     return resource;
   }
 
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static Resource newInstance(Resource resource) {
+    Resource ret = Resource.newInstance(0, 0);
+    for (Map.Entry<String, ResourceInformation> entry : resource.getResources()
+        .entrySet()) {
+      ret.setResourceInformation(entry.getKey(),
+          ResourceInformation.newInstance(entry.getValue()));
+    }
+    return ret;
+  }
+
   /**
    * This method is DEPRECATED:
    * Use {@link Resource#getMemorySize()} instead

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
index a17e81b..7d74efc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -31,6 +31,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   private String units;
   private ResourceTypes resourceType;
   private Long value;
+  private Long minimumAllocation;
+  private Long maximumAllocation;
 
   private static final String MEMORY_URI = "memory-mb";
   private static final String VCORES_URI = "vcores";
@@ -118,6 +120,42 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   }
 
   /**
+   * Get the minimum allocation for the resource.
+   *
+   * @return the minimum allocation for the resource
+   */
+  public Long getMinimumAllocation() {
+    return minimumAllocation;
+  }
+
+  /**
+   * Set the minimum allocation for the resource.
+   *
+   * @param minimumAllocation the minimum allocation for the resource
+   */
+  public void setMinimumAllocation(Long minimumAllocation) {
+    this.minimumAllocation = minimumAllocation;
+  }
+
+  /**
+   * Get the maximum allocation for the resource.
+   *
+   * @return the maximum allocation for the resource
+   */
+  public Long getMaximumAllocation() {
+    return maximumAllocation;
+  }
+
+  /**
+   * Set the maximum allocation for the resource.
+   *
+   * @param maximumAllocation the maximum allocation for the resource
+   */
+  public void setMaximumAllocation(Long maximumAllocation) {
+    this.maximumAllocation = maximumAllocation;
+  }
+
+  /**
    * Create a new instance of ResourceInformation from another object.
    *
    * @param other the object from which the new object should be created
@@ -129,33 +167,41 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
     ret.setResourceType(other.getResourceType());
     ret.setUnits(other.getUnits());
     ret.setValue(other.getValue());
+    ret.setMinimumAllocation(other.getMinimumAllocation());
+    ret.setMaximumAllocation(other.getMaximumAllocation());
     return ret;
   }
 
   public static ResourceInformation newInstance(String name, String units,
-      Long value, ResourceTypes type) {
+      Long value, ResourceTypes type, Long minimumAllocation,
+      Long maximumAllocation) {
     ResourceInformation ret = new ResourceInformation();
     ret.setName(name);
     ret.setResourceType(type);
     ret.setUnits(units);
     ret.setValue(value);
+    ret.setMinimumAllocation(minimumAllocation);
+    ret.setMaximumAllocation(maximumAllocation);
     return ret;
   }
 
   public static ResourceInformation newInstance(String name, String units,
       Long value) {
     return ResourceInformation
-        .newInstance(name, units, value, ResourceTypes.COUNTABLE);
+        .newInstance(name, units, value, ResourceTypes.COUNTABLE, 0L,
+            Long.MAX_VALUE);
   }
 
   public static ResourceInformation newInstance(String name, String units) {
     return ResourceInformation
-        .newInstance(name, units, 0L, ResourceTypes.COUNTABLE);
+        .newInstance(name, units, 0L, ResourceTypes.COUNTABLE, 0L,
+            Long.MAX_VALUE);
   }
 
   public static ResourceInformation newInstance(String name, Long value) {
     return ResourceInformation
-        .newInstance(name, "", value, ResourceTypes.COUNTABLE);
+        .newInstance(name, "", value, ResourceTypes.COUNTABLE, 0L,
+            Long.MAX_VALUE);
   }
 
   public static ResourceInformation newInstance(String name) {
@@ -165,7 +211,8 @@ public class ResourceInformation implements Comparable<ResourceInformation> {
   @Override
   public String toString() {
     return "name: " + this.name + ", units: " + this.units + ", type: "
-        + resourceType + ", value: " + value;
+        + resourceType + ", value: " + value + ", minimum allocation: "
+        + minimumAllocation + ", maximum allocation: " + maximumAllocation;
   }
 
   public String getShorthandRepresentation() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index ce85b21..32356ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -99,7 +99,22 @@ public abstract class ResourceRequest extends AbstractResourceRequest
         .resourceName(hostName).capability(capability)
         .numContainers(numContainers).relaxLocality(relaxLocality)
         .nodeLabelExpression(labelExpression)
-        .executionTypeRequest(executionTypeRequest).build();
+        .executionTypeRequest(executionTypeRequest).profileCapability(null)
+        .build();
+  }
+
+  @Public
+  @Evolving
+  public static ResourceRequest newInstance(Priority priority, String hostName,
+      Resource capability, int numContainers, boolean relaxLocality,
+      String labelExpression, ExecutionTypeRequest executionTypeRequest,
+      ProfileCapability profile) {
+    return ResourceRequest.newBuilder().priority(priority)
+        .resourceName(hostName).capability(capability)
+        .numContainers(numContainers).relaxLocality(relaxLocality)
+        .nodeLabelExpression(labelExpression)
+        .executionTypeRequest(executionTypeRequest).profileCapability(profile)
+        .build();
   }
 
   @Public
@@ -125,6 +140,7 @@ public abstract class ResourceRequest extends AbstractResourceRequest
       resourceRequest.setRelaxLocality(true);
       resourceRequest.setExecutionTypeRequest(
           ExecutionTypeRequest.newInstance());
+      resourceRequest.setProfileCapability(null);
     }
 
     /**
@@ -239,6 +255,21 @@ public abstract class ResourceRequest extends AbstractResourceRequest
     }
 
     /**
+     * Set the <code>resourceProfile</code> of the request.
+     * @see ResourceRequest#setProfileCapability(ProfileCapability)
+     * @param profileCapability
+     *          <code>profileCapability</code> of the request
+     * @return {@link ResourceRequestBuilder}
+     */
+    @Public
+    @Evolving
+    public ResourceRequestBuilder profileCapability(
+        ProfileCapability profileCapability) {
+      resourceRequest.setProfileCapability(profileCapability);
+      return this;
+    }
+
+    /**
      * Return generated {@link ResourceRequest} object.
      * @return {@link ResourceRequest}
      */
@@ -455,6 +486,14 @@ public abstract class ResourceRequest extends AbstractResourceRequest
   @Evolving
   public abstract void setNodeLabelExpression(String nodelabelExpression);
 
+  @Public
+  @Evolving
+  public abstract ProfileCapability getProfileCapability();
+
+  @Public
+  @Evolving
+  public abstract void setProfileCapability(ProfileCapability p);
+
   /**
    * Get the optional <em>ID</em> corresponding to this allocation request. This
    * ID is an identifier for different {@code ResourceRequest}s from the <b>same
@@ -514,12 +553,14 @@ public abstract class ResourceRequest extends AbstractResourceRequest
     Resource capability = getCapability();
     String hostName = getResourceName();
     Priority priority = getPriority();
+    ProfileCapability profile = getProfileCapability();
     result =
         prime * result + ((capability == null) ? 0 : capability.hashCode());
     result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
     result = prime * result + getNumContainers();
     result = prime * result + ((priority == null) ? 0 : priority.hashCode());
     result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
+    result = prime * result + ((profile == null) ? 0 : profile.hashCode());
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 5118303..9320795 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -144,6 +144,7 @@
             <exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
             <exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
             <exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
+            <exclude>src/test/resources/resource-profiles.json</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 52155f5..f857fa0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -113,6 +114,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     private boolean relaxLocality;
     private String nodeLabelsExpression;
     private ExecutionTypeRequest executionTypeRequest;
+    private String resourceProfile;
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints and
@@ -159,6 +161,26 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this(capability, nodes, racks, priority, allocationRequestId, true, null,
           ExecutionTypeRequest.newInstance());
     }
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints and
+     * locality relaxation enabled.
+     *
+     * @param capability
+     *          The {@link ProfileCapability} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     */
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority) {
+      this(capability, nodes, racks, priority, 0, true, null);
+    }
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
@@ -187,6 +209,29 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      * Instantiates a {@link ContainerRequest} with the given constraints.
      *
      * @param capability
+     *          The {@link ProfileCapability} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
+     */
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority, boolean relaxLocality) {
+      this(capability, nodes, racks, priority, 0, relaxLocality, null);
+    }
+
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     *
+     * @param capability
      *          The {@link Resource} to be requested for each container.
      * @param nodes
      *          Any hosts to request that the containers are placed on.
@@ -273,6 +318,59 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
           relaxLocality, nodeLabelsExpression,
           ExecutionTypeRequest.newInstance());
     }
+
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority, long allocationRequestId,
+        boolean relaxLocality, String nodeLabelsExpression) {
+      this(capability, nodes, racks, priority, allocationRequestId,
+          relaxLocality, nodeLabelsExpression,
+          ExecutionTypeRequest.newInstance());
+    }
+
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     *
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param allocationRequestId
+     *          The allocationRequestId of the request. To be used as a tracking
+     *          id to match Containers allocated against this request. Will
+     *          default to 0 if not specified.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
+     * @param nodeLabelsExpression
+     *          Set node labels to allocate resource, now we only support
+     *          asking for only a single node label
+     * @param executionTypeRequest
+     *          Set the execution type of the container request.
+     */
+    public ContainerRequest(Resource capability, String[] nodes, String[] racks,
+        Priority priority, long allocationRequestId, boolean relaxLocality,
+        String nodeLabelsExpression,
+        ExecutionTypeRequest executionTypeRequest) {
+      this(capability, nodes, racks, priority, allocationRequestId,
+          relaxLocality, nodeLabelsExpression, executionTypeRequest,
+          ProfileCapability.DEFAULT_PROFILE);
+    }
+
+    public ContainerRequest(ProfileCapability capability, String[] nodes,
+        String[] racks, Priority priority, long allocationRequestId,
+        boolean relaxLocality, String nodeLabelsExpression,
+        ExecutionTypeRequest executionTypeRequest) {
+      this(capability.getProfileCapabilityOverride(), nodes, racks, priority,
+          allocationRequestId, relaxLocality, nodeLabelsExpression,
+          executionTypeRequest, capability.getProfileName());
+    }
           
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
@@ -300,11 +398,13 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      *          asking for only a single node label
      * @param executionTypeRequest
      *          Set the execution type of the container request.
+     * @param profile
+     *          Set the resource profile for the container request
      */
     public ContainerRequest(Resource capability, String[] nodes, String[] racks,
         Priority priority, long allocationRequestId, boolean relaxLocality,
         String nodeLabelsExpression,
-        ExecutionTypeRequest executionTypeRequest) {
+        ExecutionTypeRequest executionTypeRequest, String profile) {
       this.allocationRequestId = allocationRequestId;
       this.capability = capability;
       this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
@@ -313,6 +413,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this.relaxLocality = relaxLocality;
       this.nodeLabelsExpression = nodeLabelsExpression;
       this.executionTypeRequest = executionTypeRequest;
+      this.resourceProfile = profile;
       sanityCheck();
     }
 
@@ -364,6 +465,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       return executionTypeRequest;
     }
 
+    public String getResourceProfile() {
+      return resourceProfile;
+    }
+
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
@@ -371,6 +476,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
       sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
           .append("]");
+      sb.append("Resource Profile[").append(resourceProfile).append("]");
       return sb.toString();
     }
 
@@ -597,6 +703,15 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
         " AMRMClient is expected to implement this !!");
   }
 
+
+  @InterfaceStability.Evolving
+  public List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      ProfileCapability capability) {
+    throw new UnsupportedOperationException("The sub-class extending" +
+        " AMRMClient is expected to implement this !!");
+  }
+
   /**
    * Get outstanding <code>ContainerRequest</code>s matching the given
    * allocationRequestId. These ContainerRequests should have been added via

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 44fc1e0..5b85527 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -105,56 +106,56 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<String> blacklistedNodes = new HashSet<String>();
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
+
+  protected Map<String, Resource> resourceProfilesMap;
   
   static class ResourceRequestInfo<T> {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
-    
+
     ResourceRequestInfo(Long allocationRequestId, Priority priority,
-        String resourceName, Resource capability, boolean relaxLocality) {
+        String resourceName, Resource capability, boolean relaxLocality,
+        String resourceProfile) {
+      ProfileCapability profileCapability = ProfileCapability
+          .newInstance(resourceProfile, capability);
       remoteRequest = ResourceRequest.newBuilder().priority(priority)
           .resourceName(resourceName).capability(capability).numContainers(0)
-          .allocationRequestId(allocationRequestId)
-          .relaxLocality(relaxLocality).build();
+          .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality)
+          .profileCapability(profileCapability).build();
       containerRequests = new LinkedHashSet<T>();
     }
   }
 
   /**
-   * Class compares Resource by memory then cpu in reverse order
+   * Class compares Resource by memory, then cpu and then the remaining resource
+   * types in reverse order.
    */
-  static class ResourceReverseMemoryThenCpuComparator implements
-      Comparator<Resource>, Serializable {
-    static final long serialVersionUID = 12345L;
-    @Override
-    public int compare(Resource arg0, Resource arg1) {
-      long mem0 = arg0.getMemorySize();
-      long mem1 = arg1.getMemorySize();
-      long cpu0 = arg0.getVirtualCores();
-      long cpu1 = arg1.getVirtualCores();
-      if(mem0 == mem1) {
-        if(cpu0 == cpu1) {
-          return 0;
-        }
-        if(cpu0 < cpu1) {
-          return 1;
-        }
-        return -1;
-      }
-      if(mem0 < mem1) { 
-        return 1;
-      }
-      return -1;
-    }    
+  static class ProfileCapabilityComparator<T extends ProfileCapability>
+      implements Comparator<T> {
+
+    HashMap<String, Resource> resourceProfilesMap;
+
+    public ProfileCapabilityComparator(
+        HashMap<String, Resource> resourceProfileMap) {
+      this.resourceProfilesMap = resourceProfileMap;
+    }
+
+    public int compare(T arg0, T arg1) {
+      Resource resource0 =
+          ProfileCapability.toResource(arg0, resourceProfilesMap);
+      Resource resource1 =
+          ProfileCapability.toResource(arg1, resourceProfilesMap);
+      return resource1.compareTo(resource0);
+    }
   }
 
-  static boolean canFit(Resource arg0, Resource arg1) {
-    long mem0 = arg0.getMemorySize();
-    long mem1 = arg1.getMemorySize();
-    long cpu0 = arg0.getVirtualCores();
-    long cpu1 = arg1.getVirtualCores();
-    
-    return (mem0 <= mem1 && cpu0 <= cpu1);
+  boolean canFit(ProfileCapability arg0, ProfileCapability arg1) {
+    Resource resource0 =
+        ProfileCapability.toResource(arg0, resourceProfilesMap);
+    Resource resource1 =
+        ProfileCapability.toResource(arg1, resourceProfilesMap);
+    return Resources.fitsIn(resource0, resource1);
+
   }
 
   private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
@@ -232,6 +233,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return registerApplicationMaster();
   }
 
+  @SuppressWarnings("unchecked")
   private RegisterApplicationMasterResponse registerApplicationMaster()
       throws YarnException, IOException {
     RegisterApplicationMasterRequest request =
@@ -244,6 +246,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {
         populateNMTokens(response.getNMTokensFromPreviousAttempts());
       }
+      this.resourceProfilesMap = response.getResourceProfiles();
     }
     return response;
   }
@@ -416,13 +419,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     for(ResourceRequest r : ask) {
       // create a copy of ResourceRequest as we might change it while the
       // RPC layer is using it to send info across
-      ResourceRequest rr = ResourceRequest.newBuilder()
-          .priority(r.getPriority()).resourceName(r.getResourceName())
-          .capability(r.getCapability()).numContainers(r.getNumContainers())
-          .relaxLocality(r.getRelaxLocality())
-          .nodeLabelExpression(r.getNodeLabelExpression())
-          .executionTypeRequest(r.getExecutionTypeRequest())
-          .allocationRequestId(r.getAllocationRequestId()).build();
+      ResourceRequest rr =
+          ResourceRequest.newBuilder().priority(r.getPriority())
+              .resourceName(r.getResourceName()).capability(r.getCapability())
+              .numContainers(r.getNumContainers())
+              .relaxLocality(r.getRelaxLocality())
+              .nodeLabelExpression(r.getNodeLabelExpression())
+              .executionTypeRequest(r.getExecutionTypeRequest())
+              .allocationRequestId(r.getAllocationRequestId())
+              .profileCapability(r.getProfileCapability()).build();
       askList.add(rr);
     }
     return askList;
@@ -504,6 +509,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized void addContainerRequest(T req) {
     Preconditions.checkArgument(req != null,
         "Resource request can not be null.");
+    ProfileCapability profileCapability = ProfileCapability
+        .newInstance(req.getResourceProfile(), req.getCapability());
     Set<String> dedupedRacks = new HashSet<String>();
     if (req.getRacks() != null) {
       dedupedRacks.addAll(req.getRacks());
@@ -516,6 +523,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     Set<String> inferredRacks = resolveRacks(req.getNodes());
     inferredRacks.removeAll(dedupedRacks);
 
+    checkResourceProfile(req.getResourceProfile());
+
     // check that specific and non-specific requests cannot be mixed within a
     // priority
     checkLocalityRelaxationConflict(req.getAllocationRequestId(),
@@ -540,26 +549,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       }
       for (String node : dedupedNodes) {
         addResourceRequest(req.getPriority(), node,
-            req.getExecutionTypeRequest(), req.getCapability(), req, true,
+            req.getExecutionTypeRequest(), profileCapability, req, true,
             req.getNodeLabelExpression());
       }
     }
 
     for (String rack : dedupedRacks) {
       addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
-          req.getCapability(), req, true, req.getNodeLabelExpression());
+          profileCapability, req, true, req.getNodeLabelExpression());
     }
 
     // Ensure node requests are accompanied by requests for
     // corresponding rack
     for (String rack : inferredRacks) {
       addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
-          req.getCapability(), req, req.getRelaxLocality(),
+          profileCapability, req, req.getRelaxLocality(),
           req.getNodeLabelExpression());
     }
     // Off-switch
     addResourceRequest(req.getPriority(), ResourceRequest.ANY,
-        req.getExecutionTypeRequest(), req.getCapability(), req,
+        req.getExecutionTypeRequest(), profileCapability, req,
         req.getRelaxLocality(), req.getNodeLabelExpression());
   }
 
@@ -567,6 +576,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized void removeContainerRequest(T req) {
     Preconditions.checkArgument(req != null,
         "Resource request can not be null.");
+    ProfileCapability profileCapability = ProfileCapability
+        .newInstance(req.getResourceProfile(), req.getCapability());
     Set<String> allRacks = new HashSet<String>();
     if (req.getRacks() != null) {
       allRacks.addAll(req.getRacks());
@@ -577,17 +588,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     if (req.getNodes() != null) {
       for (String node : new HashSet<String>(req.getNodes())) {
         decResourceRequest(req.getPriority(), node,
-            req.getExecutionTypeRequest(), req.getCapability(), req);
+            req.getExecutionTypeRequest(), profileCapability, req);
       }
     }
 
     for (String rack : allRacks) {
       decResourceRequest(req.getPriority(), rack,
-          req.getExecutionTypeRequest(), req.getCapability(), req);
+          req.getExecutionTypeRequest(), profileCapability, req);
     }
 
     decResourceRequest(req.getPriority(), ResourceRequest.ANY,
-        req.getExecutionTypeRequest(), req.getCapability(), req);
+        req.getExecutionTypeRequest(), profileCapability, req);
   }
 
   @Override
@@ -660,6 +671,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized List<? extends Collection<T>> getMatchingRequests(
       Priority priority, String resourceName, ExecutionType executionType,
       Resource capability) {
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
+    return getMatchingRequests(priority, resourceName, executionType,
+        profileCapability);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public synchronized List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      ProfileCapability capability) {
     Preconditions.checkArgument(capability != null,
         "The Resource to be requested should not be null ");
     Preconditions.checkArgument(priority != null,
@@ -669,22 +691,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     RemoteRequestsTable remoteRequestsTable = getTable(0);
 
     if (null != remoteRequestsTable) {
-      List<ResourceRequestInfo<T>> matchingRequests =
-          remoteRequestsTable.getMatchingRequests(priority, resourceName,
-              executionType, capability);
+      List<ResourceRequestInfo<T>> matchingRequests = remoteRequestsTable
+          .getMatchingRequests(priority, resourceName, executionType,
+              capability);
       if (null != matchingRequests) {
         // If no exact match. Container may be larger than what was requested.
         // get all resources <= capability. map is reverse sorted.
         for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
-          if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
-              !resReqInfo.containerRequests.isEmpty()) {
+          if (canFit(resReqInfo.remoteRequest.getProfileCapability(),
+              capability) && !resReqInfo.containerRequests.isEmpty()) {
             list.add(resReqInfo.containerRequests);
           }
         }
       }
     }
     // no match found
-    return list;          
+    return list;
   }
   
   private Set<String> resolveRacks(List<String> nodes) {
@@ -732,6 +754,15 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       }
     }
   }
+
+  private void checkResourceProfile(String profile) {
+    if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty()
+        && !resourceProfilesMap.containsKey(profile)) {
+      throw new InvalidContainerRequestException(
+          "Invalid profile name, valid profile names are " + resourceProfilesMap
+              .keySet());
+    }
+  }
   
   /**
    * Valid if a node label expression specified on container request is valid or
@@ -788,12 +819,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req,
       boolean relaxLocality, String labelExpression) {
     RemoteRequestsTable<T> remoteRequestsTable =
         getTable(req.getAllocationRequestId());
     if (remoteRequestsTable == null) {
       remoteRequestsTable = new RemoteRequestsTable<T>();
+      if (this.resourceProfilesMap instanceof HashMap) {
+        remoteRequestsTable.setResourceComparator(
+            new ProfileCapabilityComparator((HashMap) resourceProfilesMap));
+      }
       putTable(req.getAllocationRequestId(), remoteRequestsTable);
     }
     @SuppressWarnings("unchecked")
@@ -806,6 +841,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 
     if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
       LOG.debug("addResourceRequest:" + " applicationId="
           + " priority=" + priority.getPriority()
           + " resourceName=" + resourceName + " numContainers="
@@ -815,7 +851,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   private void decResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
     RemoteRequestsTable<T> remoteRequestsTable =
         getTable(req.getAllocationRequestId());
     if (remoteRequestsTable != null) {
@@ -825,7 +861,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
               execTypeReq, capability, req);
       // send the ResourceRequest to RM even if is 0 because it needs to
       // override a previously sent value. If ResourceRequest was not sent
-      // previously then sending 0 ought to be a no-op on RM
+      // previously then sending 0 aught to be a no-op on RM
       if (resourceRequestInfo != null) {
         addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
index 110ca79..135e1db 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -23,7 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -35,43 +35,42 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator;
 
 class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
 
   private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
 
-  static ResourceReverseMemoryThenCpuComparator resourceComparator =
-      new ResourceReverseMemoryThenCpuComparator();
+  private ProfileCapabilityComparator resourceComparator;
 
   /**
    * Nested Iterator that iterates over just the ResourceRequestInfo
    * object.
    */
   class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
-    private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
+    private Iterator<Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>>> iLocMap;
-    private Iterator<Map<ExecutionType, TreeMap<Resource,
+    private Iterator<Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> iExecTypeMap;
-    private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
+    private Iterator<TreeMap<ProfileCapability, ResourceRequestInfo>> iCapMap;
     private Iterator<ResourceRequestInfo> iResReqInfo;
 
     public RequestInfoIterator(Iterator<Map<String,
-        Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
+        Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>>>
         iLocationMap) {
       this.iLocMap = iLocationMap;
       if (iLocMap.hasNext()) {
         iExecTypeMap = iLocMap.next().values().iterator();
       } else {
         iExecTypeMap =
-            new LinkedList<Map<ExecutionType, TreeMap<Resource,
+            new LinkedList<Map<ExecutionType, TreeMap<ProfileCapability,
                 ResourceRequestInfo>>>().iterator();
       }
       if (iExecTypeMap.hasNext()) {
         iCapMap = iExecTypeMap.next().values().iterator();
       } else {
         iCapMap =
-            new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
+            new LinkedList<TreeMap<ProfileCapability, ResourceRequestInfo>>()
                 .iterator();
       }
       if (iCapMap.hasNext()) {
@@ -113,7 +112,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   // Nest map with Primary key :
   // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
   // and value : ResourceRequestInfo
-  private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
+  private Map<Priority, Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
       ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
 
   @Override
@@ -122,8 +121,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   }
 
   ResourceRequestInfo get(Priority priority, String location,
-      ExecutionType execType, Resource capability) {
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+      ExecutionType execType, ProfileCapability capability) {
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         getCapabilityMap(priority, location, execType);
     if (capabilityMap == null) {
       return null;
@@ -131,9 +130,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return capabilityMap.get(capability);
   }
 
+  @SuppressWarnings("unchecked")
   void put(Priority priority, String resourceName, ExecutionType execType,
-      Resource capability, ResourceRequestInfo resReqInfo) {
-    Map<String, Map<ExecutionType, TreeMap<Resource,
+      ProfileCapability capability, ResourceRequestInfo resReqInfo) {
+    Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> locationMap =
         remoteRequestsTable.get(priority);
     if (locationMap == null) {
@@ -143,8 +143,8 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
         LOG.debug("Added priority=" + priority);
       }
     }
-    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
-        locationMap.get(resourceName);
+    Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
+        execTypeMap = locationMap.get(resourceName);
     if (execTypeMap == null) {
       execTypeMap = new HashMap<>();
       locationMap.put(resourceName, execTypeMap);
@@ -152,9 +152,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
         LOG.debug("Added resourceName=" + resourceName);
       }
     }
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         execTypeMap.get(execType);
     if (capabilityMap == null) {
+      // this can happen if the user doesn't register with the RM before
+      // calling addResourceRequest
+      if (resourceComparator == null) {
+        resourceComparator = new ProfileCapabilityComparator(new HashMap<>());
+      }
       capabilityMap = new TreeMap<>(resourceComparator);
       execTypeMap.put(execType, capabilityMap);
       if (LOG.isDebugEnabled()) {
@@ -165,9 +170,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   }
 
   ResourceRequestInfo remove(Priority priority, String resourceName,
-      ExecutionType execType, Resource capability) {
+      ExecutionType execType, ProfileCapability capability) {
     ResourceRequestInfo retVal = null;
-    Map<String, Map<ExecutionType, TreeMap<Resource,
+    Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
     if (locationMap == null) {
       if (LOG.isDebugEnabled()) {
@@ -175,7 +180,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
       }
       return null;
     }
-    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+    Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
         execTypeMap = locationMap.get(resourceName);
     if (execTypeMap == null) {
       if (LOG.isDebugEnabled()) {
@@ -183,7 +188,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
       }
       return null;
     }
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         execTypeMap.get(execType);
     if (capabilityMap == null) {
       if (LOG.isDebugEnabled()) {
@@ -204,14 +209,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return retVal;
   }
 
-  Map<String, Map<ExecutionType, TreeMap<Resource,
+  Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
       ResourceRequestInfo>>> getLocationMap(Priority priority) {
     return remoteRequestsTable.get(priority);
   }
 
-  Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+  Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
       getExecutionTypeMap(Priority priority, String location) {
-    Map<String, Map<ExecutionType, TreeMap<Resource,
+    Map<String, Map<ExecutionType, TreeMap<ProfileCapability,
         ResourceRequestInfo>>> locationMap = getLocationMap(priority);
     if (locationMap == null) {
       return null;
@@ -219,10 +224,10 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return locationMap.get(location);
   }
 
-  TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
+  TreeMap<ProfileCapability, ResourceRequestInfo> getCapabilityMap(Priority
       priority, String location,
       ExecutionType execType) {
-    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+    Map<ExecutionType, TreeMap<ProfileCapability, ResourceRequestInfo>>
         executionTypeMap = getExecutionTypeMap(priority, location);
     if (executionTypeMap == null) {
       return null;
@@ -236,7 +241,7 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     List retList = new LinkedList<>();
     for (String location : locations) {
       for (ExecutionType eType : ExecutionType.values()) {
-        TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
             getCapabilityMap(priority, location, eType);
         if (capabilityMap != null) {
           retList.addAll(capabilityMap.values());
@@ -248,9 +253,9 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
 
   List<ResourceRequestInfo> getMatchingRequests(
       Priority priority, String resourceName, ExecutionType executionType,
-      Resource capability) {
+      ProfileCapability capability) {
     List<ResourceRequestInfo> list = new LinkedList<>();
-    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+    TreeMap<ProfileCapability, ResourceRequestInfo> capabilityMap =
         getCapabilityMap(priority, resourceName, executionType);
     if (capabilityMap != null) {
       ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
@@ -266,14 +271,15 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   @SuppressWarnings("unchecked")
   ResourceRequestInfo addResourceRequest(Long allocationRequestId,
       Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
-      Resource capability, T req, boolean relaxLocality,
+      ProfileCapability capability, T req, boolean relaxLocality,
       String labelExpression) {
-    ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
-        execTypeReq.getExecutionType(), capability);
+    ResourceRequestInfo resourceRequestInfo =
+        get(priority, resourceName, execTypeReq.getExecutionType(), capability);
     if (resourceRequestInfo == null) {
       resourceRequestInfo =
           new ResourceRequestInfo(allocationRequestId, priority, resourceName,
-              capability, relaxLocality);
+              capability.getProfileCapabilityOverride(), relaxLocality,
+              capability.getProfileName());
       put(priority, resourceName, execTypeReq.getExecutionType(), capability,
           resourceRequestInfo);
     }
@@ -288,11 +294,14 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     if (ResourceRequest.ANY.equals(resourceName)) {
       resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding request to ask " + resourceRequestInfo.remoteRequest);
+    }
     return resourceRequestInfo;
   }
 
   ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+      ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) {
     ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
         execTypeReq.getExecutionType(), capability);
 
@@ -330,4 +339,34 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
     return remoteRequestsTable.isEmpty();
   }
 
+  @SuppressWarnings("unchecked")
+  public void setResourceComparator(ProfileCapabilityComparator comparator) {
+    ProfileCapabilityComparator old = this.resourceComparator;
+    this.resourceComparator = comparator;
+    if (old != null) {
+      // we've already set a resource comparator - re-create the maps with the
+      // new one. this is needed in case someone adds container requests before
+      // registering with the RM. In such a case, the comparator won't have
+      // the resource profiles map. After registration, the map is available
+      // so re-create the capabilities maps
+
+      for (Map.Entry<Priority, Map<String, Map<ExecutionType,
+          TreeMap<ProfileCapability, ResourceRequestInfo>>>>
+          priEntry : remoteRequestsTable.entrySet()) {
+        for (Map.Entry<String, Map<ExecutionType, TreeMap<ProfileCapability,
+            ResourceRequestInfo>>> nameEntry : priEntry.getValue().entrySet()) {
+          for (Map.Entry<ExecutionType, TreeMap<ProfileCapability,
+              ResourceRequestInfo>> execEntry : nameEntry
+              .getValue().entrySet()) {
+            Map<ProfileCapability, ResourceRequestInfo> capabilityMap =
+                execEntry.getValue();
+            TreeMap<ProfileCapability, ResourceRequestInfo> newCapabiltyMap =
+                new TreeMap<>(resourceComparator);
+            newCapabiltyMap.putAll(capabilityMap);
+            execEntry.setValue(newCapabiltyMap);
+          }
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 8772fdc..74f57fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -135,7 +136,12 @@ public class TestAMRMClient {
     // set the minimum allocation so that resource decrease can go under 1024
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
-    yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    startYARNCluster();
+  }
+
+  private static void startYARNCluster() throws Exception {
+    yarnCluster =
+        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.start();
 
@@ -158,8 +164,8 @@ public class TestAMRMClient {
 
     node = nodeReports.get(0).getNodeId().getHost();
     rack = nodeReports.get(0).getRackName();
-    nodes = new String[]{ node };
-    racks = new String[]{ rack };
+    nodes = new String[] { node };
+    racks = new String[] { rack };
   }
   
   @Before
@@ -529,7 +535,8 @@ public class TestAMRMClient {
   }
   
   @Test (timeout=60000)
-  public void testAMRMClientMatchingFitInferredRack() throws YarnException, IOException {
+  public void testAMRMClientMatchingFitInferredRack()
+      throws YarnException, IOException {
     AMRMClientImpl<ContainerRequest> amClient = null;
     try {
       // start am rm client
@@ -537,10 +544,10 @@ public class TestAMRMClient {
       amClient.init(conf);
       amClient.start();
       amClient.registerApplicationMaster("Host", 10000, "");
-      
+
       Resource capability = Resource.newInstance(1024, 2);
 
-      ContainerRequest storedContainer1 = 
+      ContainerRequest storedContainer1 =
           new ContainerRequest(capability, nodes, null, priority);
       amClient.addContainerRequest(storedContainer1);
 
@@ -557,14 +564,15 @@ public class TestAMRMClient {
       verifyMatches(matches, 1);
       storedRequest = matches.get(0).iterator().next();
       assertEquals(storedContainer1, storedRequest);
-      
+
       // inferred rack match no longer valid after request is removed
       amClient.removeContainerRequest(storedContainer1);
       matches = amClient.getMatchingRequests(priority, rack, capability);
       assertTrue(matches.isEmpty());
-      
-      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-          null, null);
+
+      amClient
+          .unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, null,
+              null);
 
     } finally {
       if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
@@ -597,16 +605,19 @@ public class TestAMRMClient {
       amClient.addContainerRequest(storedContainer1);
       amClient.addContainerRequest(storedContainer2);
       amClient.addContainerRequest(storedContainer3);
+
+      ProfileCapability profileCapability =
+          ProfileCapability.newInstance(capability);
       
       // test addition and storage
       RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
           amClient.getTable(0);
       int containersRequestedAny = remoteRequestsTable.get(priority,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
           .remoteRequest.getNumContainers();
       assertEquals(2, containersRequestedAny);
       containersRequestedAny = remoteRequestsTable.get(priority1,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
           .remoteRequest.getNumContainers();
          assertEquals(1, containersRequestedAny);
       List<? extends Collection<ContainerRequest>> matches = 
@@ -1320,14 +1331,16 @@ public class TestAMRMClient {
       int expAsks, int expRelease) {
     RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
         amClient.getTable(allocationReqId);
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
     int containersRequestedNode = remoteRequestsTable.get(priority,
-        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
         .getNumContainers();
     int containersRequestedRack = remoteRequestsTable.get(priority,
-        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
         .getNumContainers();
     int containersRequestedAny = remoteRequestsTable.get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
         .remoteRequest.getNumContainers();
 
     assertEquals(expNode, containersRequestedNode);
@@ -1534,4 +1547,108 @@ public class TestAMRMClient {
     }
     return result;
   }
+
+  @Test(timeout = 60000)
+  public void testGetMatchingFitWithProfiles() throws Exception {
+    cancelApp();
+    tearDown();
+    conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
+    startYARNCluster();
+    startApp();
+    AMRMClient<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.<ContainerRequest>createAMRMClient();
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      ProfileCapability capability1 = ProfileCapability.newInstance("minimum");
+      ProfileCapability capability2 = ProfileCapability.newInstance("default");
+      ProfileCapability capability3 = ProfileCapability.newInstance("maximum");
+      ProfileCapability capability4 = ProfileCapability
+          .newInstance("minimum", Resource.newInstance(2048, 1));
+      ProfileCapability capability5 = ProfileCapability.newInstance("default");
+      ProfileCapability capability6 = ProfileCapability
+          .newInstance("default", Resource.newInstance(2048, 1));
+      // http has the same capabilities as default
+      ProfileCapability capability7 = ProfileCapability.newInstance("http");
+
+      ContainerRequest storedContainer1 =
+          new ContainerRequest(capability1, nodes, racks, priority);
+      ContainerRequest storedContainer2 =
+          new ContainerRequest(capability2, nodes, racks, priority);
+      ContainerRequest storedContainer3 =
+          new ContainerRequest(capability3, nodes, racks, priority);
+      ContainerRequest storedContainer4 =
+          new ContainerRequest(capability4, nodes, racks, priority);
+      ContainerRequest storedContainer5 =
+          new ContainerRequest(capability5, nodes, racks, priority2);
+      ContainerRequest storedContainer6 =
+          new ContainerRequest(capability6, nodes, racks, priority);
+      ContainerRequest storedContainer7 =
+          new ContainerRequest(capability7, nodes, racks, priority);
+
+
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer2);
+      amClient.addContainerRequest(storedContainer3);
+      amClient.addContainerRequest(storedContainer4);
+      amClient.addContainerRequest(storedContainer5);
+      amClient.addContainerRequest(storedContainer6);
+      amClient.addContainerRequest(storedContainer7);
+
+      // test matching of containers
+      List<? extends Collection<ContainerRequest>> matches;
+      ContainerRequest storedRequest;
+      // exact match
+      ProfileCapability testCapability1 =
+          ProfileCapability.newInstance("minimum");
+      matches = amClient
+          .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+              testCapability1);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertEquals(storedContainer1, storedRequest);
+      amClient.removeContainerRequest(storedContainer1);
+
+      // exact matching with order maintained
+      // we should get back 3 matches - default + http because they have the
+      // same capability
+      ProfileCapability testCapability2 =
+          ProfileCapability.newInstance("default");
+      matches = amClient
+          .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+              testCapability2);
+      verifyMatches(matches, 2);
+      // must be returned in the order they were made
+      int i = 0;
+      for (ContainerRequest storedRequest1 : matches.get(0)) {
+        switch(i) {
+        case 0:
+          assertEquals(storedContainer2, storedRequest1);
+          break;
+        case 1:
+          assertEquals(storedContainer7, storedRequest1);
+          break;
+        }
+        i++;
+      }
+      amClient.removeContainerRequest(storedContainer5);
+
+      // matching with larger container. all requests returned
+      Resource testCapability3 = Resource.newInstance(8192, 8);
+      matches = amClient
+          .getMatchingRequests(priority, node, testCapability3);
+      assertEquals(3, matches.size());
+
+      Resource testCapability4 = Resource.newInstance(2048, 1);
+      matches = amClient.getMatchingRequests(priority, node, testCapability4);
+      assertEquals(1, matches.size());
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index ad18da3..53e70ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -274,9 +275,10 @@ public class TestAMRMClientContainerRequest {
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
       String location, boolean expectedRelaxLocality,
       ExecutionType executionType) {
-    ResourceRequest ask = client.getTable(0)
-        .get(request.getPriority(), location, executionType,
-            request.getCapability()).remoteRequest;
+    ProfileCapability profileCapability = ProfileCapability
+        .newInstance(request.getResourceProfile(), request.getCapability());
+    ResourceRequest ask = client.getTable(0).get(request.getPriority(),
+        location, executionType, profileCapability).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(1, ask.getNumContainers());
     assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index b552d19..3d2b7a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -388,18 +389,21 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
 
       RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
           amClient.getTable(0);
+      ProfileCapability profileCapability =
+          ProfileCapability.newInstance(capability);
+
       int containersRequestedNode = remoteRequestsTable.get(priority,
-          node, ExecutionType.GUARANTEED, capability).remoteRequest
+          node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
           .getNumContainers();
       int containersRequestedRack = remoteRequestsTable.get(priority,
-          rack, ExecutionType.GUARANTEED, capability).remoteRequest
+          rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
           .getNumContainers();
       int containersRequestedAny = remoteRequestsTable.get(priority,
-          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
           .remoteRequest.getNumContainers();
       int oppContainersRequestedAny =
           remoteRequestsTable.get(priority2, ResourceRequest.ANY,
-              ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+              ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
               .getNumContainers();
 
       assertEquals(2, containersRequestedNode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index d211d6d..634541b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -253,9 +254,11 @@ public class TestNMClient {
           racks, priority));
     }
 
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
     int containersRequestedAny = rmClient.getTable(0)
         .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
-            capability).remoteRequest.getNumContainers();
+            profileCapability).remoteRequest.getNumContainers();
 
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
index 802c207..d11aac4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocation.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ProfileCapability;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
@@ -284,18 +285,20 @@ public class TestOpportunisticContainerAllocation {
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true)));
 
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
     int containersRequestedNode = amClient.getTable(0).get(priority,
-        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        node, ExecutionType.GUARANTEED, profileCapability).remoteRequest
         .getNumContainers();
     int containersRequestedRack = amClient.getTable(0).get(priority,
-        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest
         .getNumContainers();
     int containersRequestedAny = amClient.getTable(0).get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability)
         .remoteRequest.getNumContainers();
     int oppContainersRequestedAny =
         amClient.getTable(0).get(priority2, ResourceRequest.ANY,
-            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
             .getNumContainers();
 
     assertEquals(2, containersRequestedNode);
@@ -413,9 +416,11 @@ public class TestOpportunisticContainerAllocation {
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true)));
 
+    ProfileCapability profileCapability =
+        ProfileCapability.newInstance(capability);
     int oppContainersRequestedAny =
         amClient.getTable(0).get(priority, ResourceRequest.ANY,
-            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest
             .getNumContainers();
 
     assertEquals(2, oppContainersRequestedAny);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
new file mode 100644
index 0000000..d0f3f72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/resources/resource-profiles.json
@@ -0,0 +1,18 @@
+{
+    "minimum": {
+        "memory-mb" : 1024,
+        "vcores" : 1
+    },
+    "default" : {
+        "memory-mb" : 2048,
+        "vcores" : 2
+    },
+    "maximum" : {
+        "memory-mb": 4096,
+        "vcores" : 4
+    },
+    "http" : {
+        "memory-mb" : 2048,
+        "vcores" : 2
+    }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
index a95aadf..626e05d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfilesProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProfileEntry;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@@ -59,6 +61,7 @@ public class RegisterApplicationMasterResponsePBImpl extends
   private List<Container> containersFromPreviousAttempts = null;
   private List<NMToken> nmTokens = null;
   private EnumSet<SchedulerResourceTypes> schedulerResourceTypes = null;
+  private Map<String, Resource> profiles = null;
 
   public RegisterApplicationMasterResponsePBImpl() {
     builder = RegisterApplicationMasterResponseProto.newBuilder();
@@ -123,6 +126,9 @@ public class RegisterApplicationMasterResponsePBImpl extends
     if(schedulerResourceTypes != null) {
       addSchedulerResourceTypes();
     }
+    if (profiles != null) {
+      addResourceProfiles();
+    }
   }
 
 
@@ -433,6 +439,58 @@ public class RegisterApplicationMasterResponsePBImpl extends
     this.schedulerResourceTypes.addAll(types);
   }
 
+  private void addResourceProfiles() {
+    maybeInitBuilder();
+    builder.clearResourceProfiles();
+    if (profiles == null) {
+      return;
+    }
+    ResourceProfilesProto.Builder profilesBuilder =
+        ResourceProfilesProto.newBuilder();
+    for (Map.Entry<String, Resource> entry : profiles.entrySet()) {
+      ResourceProfileEntry.Builder entryBuilder =
+          ResourceProfileEntry.newBuilder();
+      entryBuilder.setName(entry.getKey());
+      entryBuilder.setResources(convertToProtoFormat(entry.getValue()));
+      profilesBuilder.addResourceProfilesMap(entryBuilder.build());
+    }
+    builder.setResourceProfiles(profilesBuilder.build());
+  }
+
+  private void initResourceProfiles() {
+    if (this.profiles != null) {
+      return;
+    }
+    this.profiles = new HashMap<>();
+    RegisterApplicationMasterResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+
+    if (p.hasResourceProfiles()) {
+      ResourceProfilesProto profilesProto = p.getResourceProfiles();
+      for (ResourceProfileEntry entry : profilesProto
+          .getResourceProfilesMapList()) {
+        this.profiles
+            .put(entry.getName(), convertFromProtoFormat(entry.getResources()));
+      }
+    }
+  }
+
+  @Override
+  public Map<String, Resource> getResourceProfiles() {
+    initResourceProfiles();
+    return this.profiles;
+  }
+
+  @Override
+  public void setResourceProfiles(Map<String, Resource> profilesMap) {
+    if (profilesMap == null) {
+      return;
+    }
+    initResourceProfiles();
+    this.profiles.clear();
+    this.profiles.putAll(profilesMap);
+  }
+
   private Resource convertFromProtoFormat(ResourceProto resource) {
     return new ResourcePBImpl(resource);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb6e4f08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
index 236a763..da07539 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
@@ -130,8 +130,8 @@ public class ResourcePBImpl extends Resource {
               ResourceTypes.COUNTABLE;
       String units = entry.hasUnits() ? entry.getUnits() : "";
       Long value = entry.hasValue() ? entry.getValue() : 0L;
-      ResourceInformation ri =
-          ResourceInformation.newInstance(entry.getKey(), units, value, type);
+      ResourceInformation ri = ResourceInformation
+          .newInstance(entry.getKey(), units, value, type, 0L, Long.MAX_VALUE);
       if (resources.containsKey(ri.getName())) {
         resources.get(ri.getName()).setResourceType(ri.getResourceType());
         resources.get(ri.getName()).setUnits(ri.getUnits());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org