You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2016/08/27 00:17:37 UTC

[6/8] hadoop git commit: YARN-4889. Changes in AMRMClient for identifying resource-requests explicitly. (Arun Suresh via wangda)

YARN-4889. Changes in AMRMClient for identifying resource-requests explicitly. (Arun Suresh via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19c743c1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19c743c1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19c743c1

Branch: refs/heads/HADOOP-13341
Commit: 19c743c1bbcaf3df8f1d63e557143c960a538c42
Parents: b930dc3
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Aug 26 16:48:00 2016 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Aug 26 17:14:12 2016 -0700

----------------------------------------------------------------------
 .../yarn/api/records/ResourceRequest.java       |  19 +-
 .../hadoop/yarn/client/api/AMRMClient.java      | 132 +++++++++++-
 .../yarn/client/api/async/AMRMClientAsync.java  |  18 ++
 .../yarn/client/api/impl/AMRMClientImpl.java    | 163 ++++++++++-----
 .../client/api/impl/RemoteRequestsTable.java    |  11 +-
 .../yarn/client/api/impl/TestAMRMClient.java    | 200 ++++++++++++++++---
 .../impl/TestAMRMClientContainerRequest.java    |   7 +-
 .../api/impl/TestDistributedScheduling.java     |  24 ++-
 .../yarn/client/api/impl/TestNMClient.java      |   6 +-
 9 files changed, 468 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 07f132c..2d6f0f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -112,6 +112,10 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
       // Compare priority, host and capability
       int ret = r1.getPriority().compareTo(r2.getPriority());
       if (ret == 0) {
+        ret = Long.compare(
+            r1.getAllocationRequestId(), r2.getAllocationRequestId());
+      }
+      if (ret == 0) {
         String h1 = r1.getResourceName();
         String h2 = r2.getResourceName();
         ret = h1.compareTo(h2);
@@ -381,6 +385,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
     result = prime * result + ((hostName == null) ? 0 : hostName.hashCode());
     result = prime * result + getNumContainers();
     result = prime * result + ((priority == null) ? 0 : priority.hashCode());
+    result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode();
     return result;
   }
 
@@ -422,6 +427,11 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
         .equals(other.getExecutionTypeRequest().getExecutionType())) {
       return false;
     }
+
+    if (getAllocationRequestId() != other.getAllocationRequestId()) {
+      return false;
+    }
+
     if (getNodeLabelExpression() == null) {
       if (other.getNodeLabelExpression() != null) {
         return false;
@@ -452,7 +462,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
           int capabilityComparison =
               this.getCapability().compareTo(other.getCapability());
           if (capabilityComparison == 0) {
-            return this.getNumContainers() - other.getNumContainers();
+            int numContainerComparison =
+                this.getNumContainers() - other.getNumContainers();
+            if (numContainerComparison == 0) {
+              return Long.compare(getAllocationRequestId(),
+                  other.getAllocationRequestId());
+            } else {
+              return numContainerComparison;
+            }
           } else {
             return capabilityComparison;
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 79d587a..2990c05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -110,6 +110,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     final List<String> nodes;
     final List<String> racks;
     final Priority priority;
+    final long allocationRequestId;
     final boolean relaxLocality;
     final String nodeLabelsExpression;
     final ExecutionTypeRequest executionTypeRequest;
@@ -134,6 +135,31 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
         String[] racks, Priority priority) {
       this(capability, nodes, racks, priority, true, null);
     }
+
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints and
+     * locality relaxation enabled.
+     *
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param allocationRequestId Allocation Request Id
+     */
+    @Public
+    @InterfaceStability.Evolving
+    public ContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority, long allocationRequestId) {
+      this(capability, nodes, racks, priority, allocationRequestId, true, null,
+          ExecutionTypeRequest.newInstance());
+    }
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
@@ -175,16 +201,79 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      * @param relaxLocality
      *          If true, containers for this request may be assigned on hosts
      *          and racks other than the ones explicitly requested.
+     * @param allocationRequestId Allocation Request Id
+     */
+    @Public
+    @InterfaceStability.Evolving
+    public ContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority, long allocationRequestId,
+        boolean relaxLocality) {
+      this(capability, nodes, racks, priority, allocationRequestId,
+          relaxLocality, null, ExecutionTypeRequest.newInstance());
+    }
+
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     *
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
      * @param nodeLabelsExpression
      *          Set node labels to allocate resource, now we only support
      *          asking for only a single node label
      */
     public ContainerRequest(Resource capability, String[] nodes, String[] racks,
         Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
-      this(capability, nodes, racks, priority, relaxLocality,
+      this(capability, nodes, racks, priority, 0, relaxLocality,
           nodeLabelsExpression,
           ExecutionTypeRequest.newInstance());
     }
+
+    /**
+     * Instantiates a {@link ContainerRequest} with the given constraints.
+     *
+     * @param capability
+     *          The {@link Resource} to be requested for each container.
+     * @param nodes
+     *          Any hosts to request that the containers are placed on.
+     * @param racks
+     *          Any racks to request that the containers are placed on. The
+     *          racks corresponding to any hosts requested will be automatically
+     *          added to this list.
+     * @param priority
+     *          The priority at which to request the containers. Higher
+     *          priorities have lower numerical values.
+     * @param allocationRequestId
+     *          The allocationRequestId of the request. To be used as a tracking
+     *          id to match Containers allocated against this request. Will
+     *          default to 0 if not specified.
+     * @param relaxLocality
+     *          If true, containers for this request may be assigned on hosts
+     *          and racks other than the ones explicitly requested.
+     * @param nodeLabelsExpression
+     *          Set node labels to allocate resource, now we only support
+     *          asking for only a single node label
+     */
+    @Public
+    @InterfaceStability.Evolving
+    public ContainerRequest(Resource capability, String[] nodes, String[] racks,
+        Priority priority, long allocationRequestId, boolean relaxLocality,
+        String nodeLabelsExpression) {
+      this(capability, nodes, racks, priority, allocationRequestId,
+          relaxLocality, nodeLabelsExpression,
+          ExecutionTypeRequest.newInstance());
+    }
           
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints.
@@ -200,6 +289,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      * @param priority
      *          The priority at which to request the containers. Higher
      *          priorities have lower numerical values.
+     * @param allocationRequestId
+     *          The allocationRequestId of the request. To be used as a tracking
+     *          id to match Containers allocated against this request. Will
+     *          default to 0 if not specified.
      * @param relaxLocality
      *          If true, containers for this request may be assigned on hosts
      *          and racks other than the ones explicitly requested.
@@ -210,7 +303,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      *          Set the execution type of the container request.
      */
     public ContainerRequest(Resource capability, String[] nodes, String[] racks,
-        Priority priority, boolean relaxLocality, String nodeLabelsExpression,
+        Priority priority, long allocationRequestId, boolean relaxLocality,
+        String nodeLabelsExpression,
         ExecutionTypeRequest executionTypeRequest) {
       // Validate request
       Preconditions.checkArgument(capability != null,
@@ -223,6 +317,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
                   && (nodes == null || nodes.length == 0)),
               "Can't turn off locality relaxation on a " + 
               "request with no location constraints");
+      this.allocationRequestId = allocationRequestId;
       this.capability = capability;
       this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
       this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
@@ -247,6 +342,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     public Priority getPriority() {
       return priority;
     }
+
+    public long getAllocationRequestId() {
+      return allocationRequestId;
+    }
     
     public boolean getRelaxLocality() {
       return relaxLocality;
@@ -264,6 +363,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
       sb.append("Priority[").append(priority).append("]");
+      sb.append("AllocationRequestId[").append(allocationRequestId).append("]");
       sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
           .append("]");
       return sb.toString();
@@ -390,6 +490,10 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * Each collection in the list contains requests with identical 
    * <code>Resource</code> size that fit in the given capability. In a 
    * collection, requests will be returned in the same order as they were added.
+   *
+   * NOTE: This API only matches Container requests that were created by the
+   * client WITHOUT the allocationRequestId being set.
+   *
    * @return Collection of request matching the parameters
    */
   @InterfaceStability.Evolving
@@ -407,7 +511,11 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * Each collection in the list contains requests with identical
    * <code>Resource</code> size that fit in the given capability. In a
    * collection, requests will be returned in the same order as they were added.
-   * specify an <code>ExecutionType</code> .
+   * specify an <code>ExecutionType</code>.
+   *
+   * NOTE: This API only matches Container requests that were created by the
+   * client WITHOUT the allocationRequestId being set.
+   *
    * @param priority Priority
    * @param resourceName Location
    * @param executionType ExecutionType
@@ -421,7 +529,23 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     throw new UnsupportedOperationException("The sub-class extending" +
         " AMRMClient is expected to implement this !!");
   }
-  
+
+  /**
+   * Get outstanding <code>ContainerRequest</code>s matching the given
+   * allocationRequestId. These ContainerRequests should have been added via
+   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+   * the AMRMClient may return its internal collection directly without creating
+   * a copy. Users should not perform mutable operations on the return value.
+   *
+   * NOTE: This API only matches Container requests that were created by the
+   * client WITH the allocationRequestId being set to a non-default value.
+   *
+   * @param allocationRequestId Allocation Request Id
+   * @return Collection of request matching the parameters
+   */
+  @InterfaceStability.Evolving
+  public abstract Collection<T> getMatchingRequests(long allocationRequestId);
+
   /**
    * Update application's blacklist with addition or removal resources.
    * 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index d06fadc..10d2a2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -202,6 +202,10 @@ extends AbstractService {
   /**
    * Returns all matching ContainerRequests that match the given Priority,
    * ResourceName, ExecutionType and Capability.
+   *
+   * NOTE: This matches only requests that were made by the client WITHOUT the
+   * allocationRequestId specified.
+   *
    * @param priority Priority.
    * @param resourceName Location.
    * @param executionType ExecutionType.
@@ -214,6 +218,20 @@ extends AbstractService {
     return client.getMatchingRequests(priority, resourceName,
         executionType, capability);
   }
+
+  /**
+   * Returns all matching ContainerRequests that match the given
+   * AllocationRequestId.
+   *
+   * NOTE: This matches only requests that were made by the client WITH the
+   * allocationRequestId specified.
+   *
+   * @param allocationRequestId AllocationRequestId.
+   * @return All matching ContainerRequests
+   */
+  public Collection<T> getMatchingRequests(long allocationRequestId) {
+    return client.getMatchingRequests(allocationRequestId);
+  }
   
   /**
    * Registers this application master with the resource manager. On successful

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4145944..60834f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -108,10 +108,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
     
-    ResourceRequestInfo(Priority priority, String resourceName,
-        Resource capability, boolean relaxLocality) {
+    ResourceRequestInfo(Long allocationRequestId, Priority priority,
+        String resourceName, Resource capability, boolean relaxLocality) {
       remoteRequest = ResourceRequest.newInstance(priority, resourceName,
           capability, 0);
+      remoteRequest.setAllocationRequestId(allocationRequestId);
       remoteRequest.setRelaxLocality(relaxLocality);
       containerRequests = new LinkedHashSet<T>();
     }
@@ -154,7 +155,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return (mem0 <= mem1 && cpu0 <= cpu1);
   }
 
-  final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
+  private final Map<Long, RemoteRequestsTable<T>> remoteRequests =
+      new HashMap<>();
 
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@@ -263,10 +265,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         for(ResourceRequest r : ask) {
           // create a copy of ResourceRequest as we might change it while the 
           // RPC layer is using it to send info across
-          askList.add(ResourceRequest.newInstance(r.getPriority(),
+          ResourceRequest rr = ResourceRequest.newInstance(r.getPriority(),
               r.getResourceName(), r.getCapability(), r.getNumContainers(),
               r.getRelaxLocality(), r.getNodeLabelExpression(),
-              r.getExecutionTypeRequest()));
+              r.getExecutionTypeRequest());
+          rr.setAllocationRequestId(r.getAllocationRequestId());
+          askList.add(rr);
         }
         List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
         List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@@ -318,11 +322,14 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         synchronized (this) {
           release.addAll(this.pendingRelease);
           blacklistAdditions.addAll(this.blacklistedNodes);
-          @SuppressWarnings("unchecked")
-          Iterator<ResourceRequestInfo<T>> reqIter =
-              remoteRequestsTable.iterator();
-          while (reqIter.hasNext()) {
-            addResourceRequestToAsk(reqIter.next().remoteRequest);
+          for (RemoteRequestsTable remoteRequestsTable :
+              remoteRequests.values()) {
+            @SuppressWarnings("unchecked")
+            Iterator<ResourceRequestInfo<T>> reqIter =
+                remoteRequestsTable.iterator();
+            while (reqIter.hasNext()) {
+              addResourceRequestToAsk(reqIter.next().remoteRequest);
+            }
           }
           change.putAll(this.pendingChange);
         }
@@ -498,15 +505,16 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
     // check that specific and non-specific requests cannot be mixed within a
     // priority
-    checkLocalityRelaxationConflict(req.getPriority(), ANY_LIST,
-        req.getRelaxLocality());
+    checkLocalityRelaxationConflict(req.getAllocationRequestId(),
+        req.getPriority(), ANY_LIST, req.getRelaxLocality());
     // check that specific rack cannot be mixed with specific node within a 
     // priority. If node and its rack are both specified then they must be 
     // in the same request.
     // For explicitly requested racks, we set locality relaxation to true
-    checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
-    checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
-        req.getRelaxLocality());
+    checkLocalityRelaxationConflict(req.getAllocationRequestId(),
+        req.getPriority(), dedupedRacks, true);
+    checkLocalityRelaxationConflict(req.getAllocationRequestId(),
+        req.getPriority(), inferredRacks, req.getRelaxLocality());
     // check if the node label expression specified is valid
     checkNodeLabelExpression(req);
 
@@ -607,6 +615,24 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     return clusterNodeCount;
   }
 
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Collection<T> getMatchingRequests(long allocationRequestId) {
+    RemoteRequestsTable remoteRequestsTable = getTable(allocationRequestId);
+    LinkedHashSet<T> list = new LinkedHashSet<>();
+
+    if (remoteRequestsTable != null) {
+      Iterator<ResourceRequestInfo<T>> reqIter =
+          remoteRequestsTable.iterator();
+      while (reqIter.hasNext()) {
+        ResourceRequestInfo<T> resReqInfo = reqIter.next();
+        list.addAll(resReqInfo.containerRequests);
+      }
+    }
+    return list;
+  }
+
   @Override
   public synchronized List<? extends Collection<T>> getMatchingRequests(
       Priority priority,
@@ -617,6 +643,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public synchronized List<? extends Collection<T>> getMatchingRequests(
       Priority priority, String resourceName, ExecutionType executionType,
       Resource capability) {
@@ -626,9 +653,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         "The priority at which to request containers should not be null ");
     List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
 
-    @SuppressWarnings("unchecked")
+    RemoteRequestsTable remoteRequestsTable = getTable(0);
     List<ResourceRequestInfo<T>> matchingRequests =
-        this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
+        remoteRequestsTable.getMatchingRequests(priority, resourceName,
             executionType, capability);
     // If no exact match. Container may be larger than what was requested.
     // get all resources <= capability. map is reverse sorted.
@@ -664,23 +691,26 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
    * ContainerRequests with locality relaxation cannot be made at the same
    * priority as ContainerRequests without locality relaxation.
    */
-  private void checkLocalityRelaxationConflict(Priority priority,
-      Collection<String> locations, boolean relaxLocality) {
+  private void checkLocalityRelaxationConflict(Long allocationReqId,
+      Priority priority, Collection<String> locations, boolean relaxLocality) {
     // Locality relaxation will be set to relaxLocality for all implicitly
     // requested racks. Make sure that existing rack requests match this.
 
-    @SuppressWarnings("unchecked")
-    List<ResourceRequestInfo> allCapabilityMaps =
-        remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
-    for (ResourceRequestInfo reqs : allCapabilityMaps) {
-      ResourceRequest remoteRequest = reqs.remoteRequest;
-      boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
-      if (relaxLocality != existingRelaxLocality) {
-        throw new InvalidContainerRequestException("Cannot submit a "
-            + "ContainerRequest asking for location "
-            + remoteRequest.getResourceName() + " with locality relaxation "
-            + relaxLocality + " when it has already been requested"
-            + "with locality relaxation " + existingRelaxLocality);
+    RemoteRequestsTable<T> remoteRequestsTable = getTable(allocationReqId);
+    if (remoteRequestsTable != null) {
+      @SuppressWarnings("unchecked")
+      List<ResourceRequestInfo> allCapabilityMaps =
+          remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
+      for (ResourceRequestInfo reqs : allCapabilityMaps) {
+        ResourceRequest remoteRequest = reqs.remoteRequest;
+        boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
+        if (relaxLocality != existingRelaxLocality) {
+          throw new InvalidContainerRequestException("Cannot submit a "
+              + "ContainerRequest asking for location "
+              + remoteRequest.getResourceName() + " with locality relaxation "
+              + relaxLocality + " when it has already been requested"
+              + "with locality relaxation " + existingRelaxLocality);
+        }
       }
     }
   }
@@ -742,10 +772,17 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   private void addResourceRequest(Priority priority, String resourceName,
       ExecutionTypeRequest execTypeReq, Resource capability, T req,
       boolean relaxLocality, String labelExpression) {
+    RemoteRequestsTable<T> remoteRequestsTable =
+        getTable(req.getAllocationRequestId());
+    if (remoteRequestsTable == null) {
+      remoteRequestsTable = new RemoteRequestsTable<T>();
+      putTable(req.getAllocationRequestId(), remoteRequestsTable);
+    }
     @SuppressWarnings("unchecked")
     ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
-        .addResourceRequest(priority, resourceName,
-        execTypeReq, capability, req, relaxLocality, labelExpression);
+        .addResourceRequest(req.getAllocationRequestId(), priority,
+            resourceName, execTypeReq, capability, req, relaxLocality,
+            labelExpression);
 
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@@ -761,29 +798,37 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
   private void decResourceRequest(Priority priority, String resourceName,
       ExecutionTypeRequest execTypeReq, Resource capability, T req) {
-    @SuppressWarnings("unchecked")
-    ResourceRequestInfo resourceRequestInfo =
-        remoteRequestsTable.decResourceRequest(priority, resourceName,
-            execTypeReq, capability, req);
-    // send the ResourceRequest to RM even if is 0 because it needs to override
-    // a previously sent value. If ResourceRequest was not sent previously then
-    // sending 0 aught to be a no-op on RM
-    if (resourceRequestInfo != null) {
-      addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
-
-      // delete entry from map if no longer needed
-      if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
-        this.remoteRequestsTable.remove(priority, resourceName,
-            execTypeReq.getExecutionType(), capability);
-      }
+    RemoteRequestsTable<T> remoteRequestsTable =
+        getTable(req.getAllocationRequestId());
+    if (remoteRequestsTable != null) {
+      @SuppressWarnings("unchecked")
+      ResourceRequestInfo resourceRequestInfo =
+          remoteRequestsTable.decResourceRequest(priority, resourceName,
+              execTypeReq, capability, req);
+      // send the ResourceRequest to RM even if is 0 because it needs to
+      // override a previously sent value. If ResourceRequest was not sent
+      // previously then sending 0 ought to be a no-op on RM
+      if (resourceRequestInfo != null) {
+        addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+
+        // delete entry from map if no longer needed
+        if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+          remoteRequestsTable.remove(priority, resourceName,
+              execTypeReq.getExecutionType(), capability);
+        }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AFTER decResourceRequest:" + " applicationId="
-            + " priority=" + priority.getPriority()
-            + " resourceName=" + resourceName + " numContainers="
-            + resourceRequestInfo.remoteRequest.getNumContainers()
-            + " #asks=" + ask.size());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("AFTER decResourceRequest:"
+              + " allocationRequestId=" + req.getAllocationRequestId()
+              + " priority=" + priority.getPriority()
+              + " resourceName=" + resourceName + " numContainers="
+              + resourceRequestInfo.remoteRequest.getNumContainers()
+              + " #asks=" + ask.size());
+        }
       }
+    } else {
+      LOG.info("No remoteRequestTable found with allocationRequestId="
+          + req.getAllocationRequestId());
     }
   }
 
@@ -829,4 +874,14 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     currentUGI.addToken(amrmToken);
     amrmToken.setService(ClientRMProxy.getAMRMTokenService(getConfig()));
   }
+
+  @VisibleForTesting
+  RemoteRequestsTable<T> getTable(long allocationRequestId) {
+    return remoteRequests.get(Long.valueOf(allocationRequestId));
+  }
+
+  RemoteRequestsTable<T> putTable(long allocationRequestId,
+      RemoteRequestsTable<T> table) {
+    return remoteRequests.put(Long.valueOf(allocationRequestId), table);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
index 853a512..110ca79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -264,15 +264,16 @@ class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
   }
 
   @SuppressWarnings("unchecked")
-  ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
-      ExecutionTypeRequest execTypeReq, Resource capability, T req,
-      boolean relaxLocality, String labelExpression) {
+  ResourceRequestInfo addResourceRequest(Long allocationRequestId,
+      Priority priority, String resourceName, ExecutionTypeRequest execTypeReq,
+      Resource capability, T req, boolean relaxLocality,
+      String labelExpression) {
     ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
         execTypeReq.getExecutionType(), capability);
     if (resourceRequestInfo == null) {
       resourceRequestInfo =
-          new ResourceRequestInfo(priority, resourceName, capability,
-              relaxLocality);
+          new ResourceRequestInfo(allocationRequestId, priority, resourceName,
+              capability, relaxLocality);
       put(priority, resourceName, execTypeReq.getExecutionType(), capability,
           resourceRequestInfo);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 1eeeb78..e0ad2c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -268,6 +269,18 @@ public class TestAMRMClient {
       amClient.addContainerRequest(storedContainer5);
       amClient.addContainerRequest(storedContainer6);
       amClient.addContainerRequest(storedContainer7);
+
+      // Add some CRs with allocReqIds... These will not be returned by
+      // the default getMatchingRequests
+      ContainerRequest storedContainer11 =
+          new ContainerRequest(capability1, nodes, racks, priority, 1);
+      ContainerRequest storedContainer33 =
+          new ContainerRequest(capability3, nodes, racks, priority, 3);
+      ContainerRequest storedContainer43 =
+          new ContainerRequest(capability4, nodes, racks, priority, 3);
+      amClient.addContainerRequest(storedContainer11);
+      amClient.addContainerRequest(storedContainer33);
+      amClient.addContainerRequest(storedContainer43);
       
       // test matching of containers
       List<? extends Collection<ContainerRequest>> matches;
@@ -279,6 +292,25 @@ public class TestAMRMClient {
       storedRequest = matches.get(0).iterator().next();
       assertEquals(storedContainer1, storedRequest);
       amClient.removeContainerRequest(storedContainer1);
+
+      // exact match for allocReqId 1
+      Collection<ContainerRequest> reqIdMatches =
+          amClient.getMatchingRequests(1);
+      assertEquals(1, reqIdMatches.size());
+      storedRequest = reqIdMatches.iterator().next();
+      assertEquals(storedContainer11, storedRequest);
+      amClient.removeContainerRequest(storedContainer11);
+
+      // exact match for allocReqId 3
+      reqIdMatches = amClient.getMatchingRequests(3);
+      assertEquals(2, reqIdMatches.size());
+      Iterator<ContainerRequest> iter = reqIdMatches.iterator();
+      storedRequest = iter.next();
+      assertEquals(storedContainer43, storedRequest);
+      amClient.removeContainerRequest(storedContainer43);
+      storedRequest = iter.next();
+      assertEquals(storedContainer33, storedRequest);
+      amClient.removeContainerRequest(storedContainer33);
       
       // exact matching with order maintained
       Resource testCapability2 = Resource.newInstance(2000, 1);
@@ -364,26 +396,32 @@ public class TestAMRMClient {
       ContainerRequest storedGuarContainer2 =
           new ContainerRequest(capability2, nodes, racks, priority);
       ContainerRequest storedOpportContainer1 =
-          new ContainerRequest(capability1, nodes, racks, priority, true, null,
+          new ContainerRequest(capability1, nodes, racks, priority,
+              0, true, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       ContainerRequest storedOpportContainer2 =
-          new ContainerRequest(capability2, nodes, racks, priority, true, null,
+          new ContainerRequest(capability2, nodes, racks, priority,
+              0, true, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       ContainerRequest storedOpportContainer3 =
-          new ContainerRequest(capability3, nodes, racks, priority, true, null,
+          new ContainerRequest(capability3, nodes, racks, priority,
+              0, true, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       ContainerRequest storedOpportContainer4 =
-          new ContainerRequest(capability4, nodes, racks, priority, true, null,
+          new ContainerRequest(capability4, nodes, racks, priority,
+              0, true, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       ContainerRequest storedOpportContainer5 =
-          new ContainerRequest(capability5, nodes, racks, priority, true, null,
+          new ContainerRequest(capability5, nodes, racks, priority,
+              0, true, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       ContainerRequest storedOpportContainer6 =
-          new ContainerRequest(capability6, nodes, racks, priority, true, null,
+          new ContainerRequest(capability6, nodes, racks, priority,
+              0, true, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       ContainerRequest storedOpportContainer7 =
           new ContainerRequest(capability7, nodes, racks, priority2,
-              false, null,
+              0, false, null,
               ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
       amClient.addContainerRequest(storedGuarContainer1);
       amClient.addContainerRequest(storedGuarContainer2);
@@ -541,11 +579,13 @@ public class TestAMRMClient {
       amClient.addContainerRequest(storedContainer3);
       
       // test addition and storage
-      int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+      RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
+          amClient.getTable(0);
+      int containersRequestedAny = remoteRequestsTable.get(priority,
           ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
           .remoteRequest.getNumContainers();
       assertEquals(2, containersRequestedAny);
-      containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
+      containersRequestedAny = remoteRequestsTable.get(priority1,
           ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
           .remoteRequest.getNumContainers();
          assertEquals(1, containersRequestedAny);
@@ -584,7 +624,7 @@ public class TestAMRMClient {
           amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
       assertTrue(matches.isEmpty());
       // 0 requests left. everything got cleaned up
-      assertTrue(amClient.remoteRequestsTable.isEmpty());
+      assertTrue(amClient.getTable(0).isEmpty());
       
       // go through an exemplary allocation, matching and release cycle
       amClient.addContainerRequest(storedContainer1);
@@ -628,7 +668,7 @@ public class TestAMRMClient {
       assertEquals(0, amClient.ask.size());
       assertEquals(0, allocResponse.getAllocatedContainers().size());
       // 0 requests left. everything got cleaned up
-      assertTrue(amClient.remoteRequestsTable.isEmpty());      
+      assertTrue(remoteRequestsTable.isEmpty());
       
       amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
           null, null);
@@ -780,6 +820,16 @@ public class TestAMRMClient {
 
   @Test (timeout=60000)
   public void testAMRMClient() throws YarnException, IOException {
+    initAMRMClientAndTest(false);
+  }
+
+  @Test (timeout=60000)
+  public void testAMRMClientAllocReqId() throws YarnException, IOException {
+    initAMRMClientAndTest(true);
+  }
+
+  private void initAMRMClientAndTest(boolean useAllocReqId)
+      throws YarnException, IOException {
     AMRMClient<ContainerRequest> amClient = null;
     try {
       // start am rm client
@@ -796,7 +846,11 @@ public class TestAMRMClient {
 
       amClient.registerApplicationMaster("Host", 10000, "");
 
-      testAllocation((AMRMClientImpl<ContainerRequest>)amClient);
+      if (useAllocReqId) {
+        testAllocRequestId((AMRMClientImpl<ContainerRequest>)amClient);
+      } else {
+        testAllocation((AMRMClientImpl<ContainerRequest>) amClient);
+      }
 
       amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
           null, null);
@@ -1055,22 +1109,9 @@ public class TestAMRMClient {
         new ContainerRequest(capability, nodes, racks, priority));
     amClient.removeContainerRequest(
         new ContainerRequest(capability, nodes, racks, priority));
-    
-    int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
-        node, ExecutionType.GUARANTEED, capability).remoteRequest
-        .getNumContainers();
-    int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
-        rack, ExecutionType.GUARANTEED, capability).remoteRequest
-        .getNumContainers();
-    int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
-        .remoteRequest.getNumContainers();
 
-    assertEquals(2, containersRequestedNode);
-    assertEquals(2, containersRequestedRack);
-    assertEquals(2, containersRequestedAny);
-    assertEquals(3, amClient.ask.size());
-    assertEquals(0, amClient.release.size());
+    assertNumContainers(amClient, 0, 2, 2, 2, 3, 0);
+    int containersRequestedAny = 2;
 
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;
@@ -1163,10 +1204,15 @@ public class TestAMRMClient {
     // verify that the remove request made in between makeRequest and allocate 
     // has not been lost
     assertEquals(0, snoopRequest.getNumContainers());
-    
-    iterationsLeft = 3;
+
+    waitForContainerCompletion(3, amClient, releases);
+  }
+
+  private void waitForContainerCompletion(int numIterations,
+      AMRMClientImpl<ContainerRequest> amClient, Set<ContainerId> releases)
+      throws YarnException, IOException {
     // do a few iterations to ensure RM is not going send new containers
-    while(!releases.isEmpty() || iterationsLeft-- > 0) {
+    while(!releases.isEmpty() || numIterations-- > 0) {
       // inform RM of rejection
       AllocateResponse allocResponse = amClient.allocate(0.1f);
       // RM did not send new containers because AM does not need any
@@ -1181,7 +1227,7 @@ public class TestAMRMClient {
           }
         }
       }
-      if(iterationsLeft > 0) {
+      if(numIterations > 0) {
         // sleep to make sure NM's heartbeat
         sleep(100);
       }
@@ -1190,6 +1236,98 @@ public class TestAMRMClient {
     assertEquals(0, amClient.release.size());
   }
 
+  private void testAllocRequestId(
+      final AMRMClientImpl<ContainerRequest> amClient) throws YarnException,
+      IOException {
+    // setup container request
+
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 1));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 1));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority));
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 1));
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
+
+    assertNumContainers(amClient, 0, 1, 1, 1, 9, 0);
+    assertNumContainers(amClient, 1, 1, 1, 1, 9, 0);
+    assertNumContainers(amClient, 2, 1, 1, 1, 9, 0);
+    int containersRequestedAny = 3;
+
+    // RM should allocate container within 2 calls to allocate()
+    List<Container> allocatedContainers = new ArrayList<>();
+    int iterationsLeft = 5;
+    Set<ContainerId> releases = new TreeSet<ContainerId>();
+
+    while (allocatedContainers.size() < containersRequestedAny
+        && iterationsLeft-- > 0) {
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainers.addAll(allocResponse.getAllocatedContainers());
+      for(Container container : allocResponse.getAllocatedContainers()) {
+        ContainerId rejectContainerId = container.getId();
+        releases.add(rejectContainerId);
+        amClient.releaseAssignedContainer(rejectContainerId);
+      }
+
+      if(allocatedContainers.size() < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(containersRequestedAny, allocatedContainers.size());
+    Set<Long> expAllocIds = new HashSet<>(
+        Arrays.asList(Long.valueOf(0), Long.valueOf(1), Long.valueOf(2)));
+    Set<Long> actAllocIds = new HashSet<>();
+    for (Container ac : allocatedContainers) {
+      actAllocIds.add(Long.valueOf(ac.getAllocationRequestId()));
+    }
+    assertEquals(expAllocIds, actAllocIds);
+    assertEquals(3, amClient.release.size());
+    assertEquals(0, amClient.ask.size());
+
+    waitForContainerCompletion(3, amClient, releases);
+  }
+
+  private void assertNumContainers(AMRMClientImpl<ContainerRequest> amClient,
+      long allocationReqId, int expNode, int expRack, int expAny,
+      int expAsks, int expRelease) {
+    RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
+        amClient.getTable(allocationReqId);
+    int containersRequestedNode = remoteRequestsTable.get(priority,
+        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedRack = remoteRequestsTable.get(priority,
+        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedAny = remoteRequestsTable.get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
+
+    assertEquals(expNode, containersRequestedNode);
+    assertEquals(expRack, containersRequestedRack);
+    assertEquals(expAny, containersRequestedAny);
+    assertEquals(expAsks, amClient.ask.size());
+    assertEquals(expRelease, amClient.release.size());
+  }
+
   class CountDownSupplier implements Supplier<Boolean> {
     int counter = 0;
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index 2db33c1..ad18da3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -61,7 +61,7 @@ public class TestAMRMClientContainerRequest {
     verifyResourceRequest(client, request, ResourceRequest.ANY, true);
     ContainerRequest request2 =
         new ContainerRequest(capability, new String[] {"host1", "host2"},
-            new String[] {"/rack2"}, Priority.newInstance(1), true, null,
+            new String[] {"/rack2"}, Priority.newInstance(1), 0, true, null,
             ExecutionTypeRequest.newInstance(
                 ExecutionType.OPPORTUNISTIC, true));
     client.addContainerRequest(request2);
@@ -274,8 +274,9 @@ public class TestAMRMClientContainerRequest {
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
       String location, boolean expectedRelaxLocality,
       ExecutionType executionType) {
-    ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
-        location, executionType, request.getCapability()).remoteRequest;
+    ResourceRequest ask = client.getTable(0)
+        .get(request.getPriority(), location, executionType,
+            request.getCapability()).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(1, ask.getNumContainers());
     assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
index 40390d4..4cfc4eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java
@@ -366,12 +366,12 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
           new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
       amClient.addContainerRequest(
           new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              true, null,
+              0, true, null,
               ExecutionTypeRequest.newInstance(
                   ExecutionType.OPPORTUNISTIC, true)));
       amClient.addContainerRequest(
           new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              true, null,
+              0, true, null,
               ExecutionTypeRequest.newInstance(
                   ExecutionType.OPPORTUNISTIC, true)));
 
@@ -381,21 +381,23 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
           new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
       amClient.removeContainerRequest(
           new AMRMClient.ContainerRequest(capability, null, null, priority2,
-              true, null,
+              0, true, null,
               ExecutionTypeRequest.newInstance(
                   ExecutionType.OPPORTUNISTIC, true)));
 
-      int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+      RemoteRequestsTable<ContainerRequest> remoteRequestsTable =
+          amClient.getTable(0);
+      int containersRequestedNode = remoteRequestsTable.get(priority,
           node, ExecutionType.GUARANTEED, capability).remoteRequest
           .getNumContainers();
-      int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+      int containersRequestedRack = remoteRequestsTable.get(priority,
           rack, ExecutionType.GUARANTEED, capability).remoteRequest
           .getNumContainers();
-      int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+      int containersRequestedAny = remoteRequestsTable.get(priority,
           ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
           .remoteRequest.getNumContainers();
       int oppContainersRequestedAny =
-          amClient.remoteRequestsTable.get(priority2, ResourceRequest.ANY,
+          remoteRequestsTable.get(priority2, ResourceRequest.ANY,
               ExecutionType.OPPORTUNISTIC, capability).remoteRequest
               .getNumContainers();
 
@@ -457,7 +459,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
           new AMRMClient.ContainerRequest(capability, nodes, racks, priority));
       amClient.removeContainerRequest(
           new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
-              true, null,
+              0, true, null,
               ExecutionTypeRequest.newInstance(
                   ExecutionType.OPPORTUNISTIC, true)));
       assertEquals(4, amClient.ask.size());
@@ -469,7 +471,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
           nodes, racks, priority));
       amClient.addContainerRequest(
           new AMRMClient.ContainerRequest(capability, nodes, racks, priority2,
-              true, null,
+              0, true, null,
               ExecutionTypeRequest.newInstance(
                   ExecutionType.OPPORTUNISTIC, true)));
 
@@ -490,7 +492,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
                         priority));
                 amc.removeContainerRequest(
                     new AMRMClient.ContainerRequest(capability, null, null,
-                        priority2, true, null,
+                        priority2, 0, true, null,
                         ExecutionTypeRequest.newInstance(
                             ExecutionType.OPPORTUNISTIC, true)));
                 throw new Exception();
@@ -571,7 +573,7 @@ public class TestDistributedScheduling extends BaseAMRMProxyE2ETest {
       ExecutionTypeRequest execTypeRequest =
           ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true);
       ContainerRequest containerRequest = new AMRMClient.ContainerRequest(
-          capability, nodes, racks, priority, true, null, execTypeRequest);
+          capability, nodes, racks, priority, 0, true, null, execTypeRequest);
       amClient.addContainerRequest(containerRequest);
 
       // Wait until the container is allocated

http://git-wip-us.apache.org/repos/asf/hadoop/blob/19c743c1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index 969fb70..3640883 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -252,9 +252,9 @@ public class TestNMClient {
           racks, priority));
     }
 
-    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority,
-        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
-        .remoteRequest.getNumContainers();
+    int containersRequestedAny = rmClient.getTable(0)
+        .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED,
+            capability).remoteRequest.getNumContainers();
 
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org