You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/06/12 16:42:46 UTC

[2/2] hadoop git commit: YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh)

YARN-5124. Modify AMRMClient to set the ExecutionType in the ResourceRequest. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/51432779
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/51432779
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/51432779

Branch: refs/heads/trunk
Commit: 51432779588fdd741b4840601f5db637ec783d92
Parents: 5279af7
Author: Arun Suresh <as...@apache.org>
Authored: Sun Jun 12 09:42:38 2016 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Sun Jun 12 09:42:38 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/client/api/AMRMClient.java      |  43 +-
 .../yarn/client/api/async/AMRMClientAsync.java  |  17 +
 .../yarn/client/api/impl/AMRMClientImpl.java    | 294 +++------
 .../client/api/impl/RemoteRequestsTable.java    | 332 ++++++++++
 .../client/api/impl/BaseAMRMProxyE2ETest.java   | 197 ++++++
 .../yarn/client/api/impl/TestAMRMClient.java    |  26 +-
 .../impl/TestAMRMClientContainerRequest.java    |  54 +-
 .../yarn/client/api/impl/TestAMRMProxy.java     | 171 +----
 .../api/impl/TestDistributedScheduling.java     | 644 ++++++++++++-------
 .../yarn/client/api/impl/TestNMClient.java      |   7 +-
 .../records/impl/pb/ResourceRequestPBImpl.java  |   5 +-
 11 files changed, 1204 insertions(+), 586 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index 3ec0899..5f362c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -109,7 +110,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     final Priority priority;
     final boolean relaxLocality;
     final String nodeLabelsExpression;
-    final ExecutionType executionType;
+    final ExecutionTypeRequest executionTypeRequest;
     
     /**
      * Instantiates a {@link ContainerRequest} with the given constraints and
@@ -180,7 +181,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
         Priority priority, boolean relaxLocality, String nodeLabelsExpression) {
       this(capability, nodes, racks, priority, relaxLocality,
           nodeLabelsExpression,
-          ExecutionType.GUARANTEED);
+          ExecutionTypeRequest.newInstance());
     }
           
     /**
@@ -203,12 +204,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
      * @param nodeLabelsExpression
      *          Set node labels to allocate resource, now we only support
      *          asking for only a single node label
-     * @param executionType
+     * @param executionTypeRequest
      *          Set the execution type of the container request.
      */
     public ContainerRequest(Resource capability, String[] nodes, String[] racks,
         Priority priority, boolean relaxLocality, String nodeLabelsExpression,
-        ExecutionType executionType) {
+        ExecutionTypeRequest executionTypeRequest) {
       // Validate request
       Preconditions.checkArgument(capability != null,
           "The Resource to be requested for each container " +
@@ -226,7 +227,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       this.priority = priority;
       this.relaxLocality = relaxLocality;
       this.nodeLabelsExpression = nodeLabelsExpression;
-      this.executionType = executionType;
+      this.executionTypeRequest = executionTypeRequest;
     }
     
     public Resource getCapability() {
@@ -253,15 +254,16 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
       return nodeLabelsExpression;
     }
     
-    public ExecutionType getExecutionType() {
-      return executionType;
+    public ExecutionTypeRequest getExecutionTypeRequest() {
+      return executionTypeRequest;
     }
 
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
       sb.append("Priority[").append(priority).append("]");
-      sb.append("ExecutionType[").append(executionType).append("]");
+      sb.append("ExecutionTypeRequest[").append(executionTypeRequest)
+          .append("]");
       return sb.toString();
     }
   }
@@ -388,10 +390,35 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * collection, requests will be returned in the same order as they were added.
    * @return Collection of request matching the parameters
    */
+  @InterfaceStability.Evolving
   public abstract List<? extends Collection<T>> getMatchingRequests(
                                            Priority priority, 
                                            String resourceName, 
                                            Resource capability);
+
+  /**
+   * Get outstanding <code>ContainerRequest</code>s matching the given
+   * parameters. These ContainerRequests should have been added via
+   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+   * the AMRMClient may return its internal collection directly without creating
+   * a copy. Users should not perform mutable operations on the return value.
+   * Each collection in the list contains requests with identical
+   * <code>Resource</code> size that fit in the given capability. In a
+   * collection, requests will be returned in the same order as they were added.
+   * specify an <code>ExecutionType</code> .
+   * @param priority Priority
+   * @param resourceName Location
+   * @param executionType ExecutionType
+   * @param capability Capability
+   * @return Collection of request matching the parameters
+   */
+  @InterfaceStability.Evolving
+  public List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
+    throw new UnsupportedOperationException("The sub-class extending" +
+        " AMRMClient is expected to implement this !!");
+  }
   
   /**
    * Update application's blacklist with addition or removal resources.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index 3c8f923..2f95156 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -196,6 +197,22 @@ extends AbstractService {
                                                    Priority priority, 
                                                    String resourceName, 
                                                    Resource capability);
+
+  /**
+   * Returns all matching ContainerRequests that match the given Priority,
+   * ResourceName, ExecutionType and Capability.
+   * @param priority Priority.
+   * @param resourceName Location.
+   * @param executionType ExecutionType.
+   * @param capability Capability.
+   * @return All matching ContainerRequests
+   */
+  public List<? extends Collection<T>> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
+    return client.getMatchingRequests(priority, resourceName,
+        executionType, capability);
+  }
   
   /**
    * Registers this application master with the resource manager. On successful

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
index 4366c25..4145944 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
@@ -19,19 +19,19 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.AbstractMap.SimpleEntry;
 
@@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -102,7 +104,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<String> blacklistAdditions = new HashSet<String>();
   protected final Set<String> blacklistRemovals = new HashSet<String>();
   
-  class ResourceRequestInfo {
+  static class ResourceRequestInfo<T> {
     ResourceRequest remoteRequest;
     LinkedHashSet<T> containerRequests;
     
@@ -115,11 +117,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     }
   }
 
-
   /**
    * Class compares Resource by memory then cpu in reverse order
    */
-  class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+  static class ResourceReverseMemoryThenCpuComparator implements
+      Comparator<Resource>, Serializable {
+    static final long serialVersionUID = 12345L;
     @Override
     public int compare(Resource arg0, Resource arg1) {
       long mem0 = arg0.getMemorySize();
@@ -141,7 +144,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       return -1;
     }    
   }
-  
+
   static boolean canFit(Resource arg0, Resource arg1) {
     long mem0 = arg0.getMemorySize();
     long mem1 = arg1.getMemorySize();
@@ -150,17 +153,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     
     return (mem0 <= mem1 && cpu0 <= cpu1);
   }
-  
-  //Key -> Priority
-  //Value -> Map
-  //Key->ResourceName (e.g., nodename, rackname, *)
-  //Value->Map
-  //Key->Resource Capability
-  //Value->ResourceRequest
-  protected final 
-  Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
-    remoteRequestsTable =
-    new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
+
+  final RemoteRequestsTable remoteRequestsTable = new RemoteRequestsTable<T>();
 
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
@@ -185,6 +179,12 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     super(AMRMClientImpl.class.getName());
   }
 
+  @VisibleForTesting
+  AMRMClientImpl(ApplicationMasterProtocol protocol) {
+    super(AMRMClientImpl.class.getName());
+    this.rmClient = protocol;
+  }
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     RackResolver.init(conf);
@@ -195,8 +195,10 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected void serviceStart() throws Exception {
     final YarnConfiguration conf = new YarnConfiguration(getConfig());
     try {
-      rmClient =
-          ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
+      if (rmClient == null) {
+        rmClient = ClientRMProxy.createRMProxy(
+            conf, ApplicationMasterProtocol.class);
+      }
     } catch (IOException e) {
       throw new YarnRuntimeException(e);
     }
@@ -263,7 +265,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           // RPC layer is using it to send info across
           askList.add(ResourceRequest.newInstance(r.getPriority(),
               r.getResourceName(), r.getCapability(), r.getNumContainers(),
-              r.getRelaxLocality(), r.getNodeLabelExpression()));
+              r.getRelaxLocality(), r.getNodeLabelExpression(),
+              r.getExecutionTypeRequest()));
         }
         List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
         List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
@@ -315,13 +318,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         synchronized (this) {
           release.addAll(this.pendingRelease);
           blacklistAdditions.addAll(this.blacklistedNodes);
-          for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
-            .values()) {
-            for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
-              for (ResourceRequestInfo request : capabalities.values()) {
-                addResourceRequestToAsk(request.remoteRequest);
-              }
-            }
+          @SuppressWarnings("unchecked")
+          Iterator<ResourceRequestInfo<T>> reqIter =
+              remoteRequestsTable.iterator();
+          while (reqIter.hasNext()) {
+            addResourceRequestToAsk(reqIter.next().remoteRequest);
           }
           change.putAll(this.pendingChange);
         }
@@ -517,26 +518,28 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
             + joiner.join(req.getNodes()));        
       }
       for (String node : dedupedNodes) {
-        addResourceRequest(req.getPriority(), node, req.getCapability(), req,
-            true, req.getNodeLabelExpression());
+        addResourceRequest(req.getPriority(), node,
+            req.getExecutionTypeRequest(), req.getCapability(), req, true,
+            req.getNodeLabelExpression());
       }
     }
 
     for (String rack : dedupedRacks) {
-      addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
-          true, req.getNodeLabelExpression());
+      addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+          req.getCapability(), req, true, req.getNodeLabelExpression());
     }
 
     // Ensure node requests are accompanied by requests for
     // corresponding rack
     for (String rack : inferredRacks) {
-      addResourceRequest(req.getPriority(), rack, req.getCapability(), req,
-          req.getRelaxLocality(), req.getNodeLabelExpression());
+      addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(),
+          req.getCapability(), req, req.getRelaxLocality(),
+          req.getNodeLabelExpression());
     }
-
     // Off-switch
-    addResourceRequest(req.getPriority(), ResourceRequest.ANY, 
-        req.getCapability(), req, req.getRelaxLocality(), req.getNodeLabelExpression());
+    addResourceRequest(req.getPriority(), ResourceRequest.ANY,
+        req.getExecutionTypeRequest(), req.getCapability(), req,
+        req.getRelaxLocality(), req.getNodeLabelExpression());
   }
 
   @Override
@@ -552,16 +555,18 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     // Update resource requests
     if (req.getNodes() != null) {
       for (String node : new HashSet<String>(req.getNodes())) {
-        decResourceRequest(req.getPriority(), node, req.getCapability(), req);
+        decResourceRequest(req.getPriority(), node,
+            req.getExecutionTypeRequest(), req.getCapability(), req);
       }
     }
 
     for (String rack : allRacks) {
-      decResourceRequest(req.getPriority(), rack, req.getCapability(), req);
+      decResourceRequest(req.getPriority(), rack,
+          req.getExecutionTypeRequest(), req.getCapability(), req);
     }
 
     decResourceRequest(req.getPriority(), ResourceRequest.ANY,
-        req.getCapability(), req);
+        req.getExecutionTypeRequest(), req.getCapability(), req);
   }
 
   @Override
@@ -601,47 +606,38 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   public synchronized int getClusterNodeCount() {
     return clusterNodeCount;
   }
-  
+
+  @Override
+  public synchronized List<? extends Collection<T>> getMatchingRequests(
+      Priority priority,
+      String resourceName,
+      Resource capability) {
+    return getMatchingRequests(priority, resourceName,
+        ExecutionType.GUARANTEED, capability);
+  }
+
   @Override
   public synchronized List<? extends Collection<T>> getMatchingRequests(
-                                          Priority priority, 
-                                          String resourceName, 
-                                          Resource capability) {
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
     Preconditions.checkArgument(capability != null,
         "The Resource to be requested should not be null ");
     Preconditions.checkArgument(priority != null,
         "The priority at which to request containers should not be null ");
     List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = 
-        this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      return list;
-    }
-    TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
-        .get(resourceName);
-    if (reqMap == null) {
-      return list;
-    }
 
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo != null &&
-        !resourceRequestInfo.containerRequests.isEmpty()) {
-      list.add(resourceRequestInfo.containerRequests);
-      return list;
-    }
-    
-    // no exact match. Container may be larger than what was requested.
-    // get all resources <= capability. map is reverse sorted. 
-    SortedMap<Resource, ResourceRequestInfo> tailMap = 
-                                                  reqMap.tailMap(capability);
-    for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
-      if (canFit(entry.getKey(), capability) &&
-          !entry.getValue().containerRequests.isEmpty()) {
-        // match found that fits in the larger resource
-        list.add(entry.getValue().containerRequests);
+    @SuppressWarnings("unchecked")
+    List<ResourceRequestInfo<T>> matchingRequests =
+        this.remoteRequestsTable.getMatchingRequests(priority, resourceName,
+            executionType, capability);
+    // If no exact match. Container may be larger than what was requested.
+    // get all resources <= capability. map is reverse sorted.
+    for (ResourceRequestInfo<T> resReqInfo : matchingRequests) {
+      if (canFit(resReqInfo.remoteRequest.getCapability(), capability) &&
+        !resReqInfo.containerRequests.isEmpty()) {
+        list.add(resReqInfo.containerRequests);
       }
     }
-    
     // no match found
     return list;          
   }
@@ -663,34 +659,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     
     return racks;
   }
-  
+
   /**
    * ContainerRequests with locality relaxation cannot be made at the same
    * priority as ContainerRequests without locality relaxation.
    */
   private void checkLocalityRelaxationConflict(Priority priority,
       Collection<String> locations, boolean relaxLocality) {
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
-        this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      return;
-    }
     // Locality relaxation will be set to relaxLocality for all implicitly
     // requested racks. Make sure that existing rack requests match this.
-    for (String location : locations) {
-        TreeMap<Resource, ResourceRequestInfo> reqs =
-            remoteRequests.get(location);
-        if (reqs != null && !reqs.isEmpty()) {
-          boolean existingRelaxLocality =
-              reqs.values().iterator().next().remoteRequest.getRelaxLocality();
-          if (relaxLocality != existingRelaxLocality) {
-            throw new InvalidContainerRequestException("Cannot submit a "
-                + "ContainerRequest asking for location " + location
-                + " with locality relaxation " + relaxLocality + " when it has "
-                + "already been requested with locality relaxation " + existingRelaxLocality);
-          }
-        }
+
+    @SuppressWarnings("unchecked")
+    List<ResourceRequestInfo> allCapabilityMaps =
+        remoteRequestsTable.getAllResourceRequestInfos(priority, locations);
+    for (ResourceRequestInfo reqs : allCapabilityMaps) {
+      ResourceRequest remoteRequest = reqs.remoteRequest;
+      boolean existingRelaxLocality = remoteRequest.getRelaxLocality();
+      if (relaxLocality != existingRelaxLocality) {
+        throw new InvalidContainerRequestException("Cannot submit a "
+            + "ContainerRequest asking for location "
+            + remoteRequest.getResourceName() + " with locality relaxation "
+            + relaxLocality + " when it has already been requested"
+            + "with locality relaxation " + existingRelaxLocality);
       }
+    }
   }
   
   /**
@@ -747,46 +739,13 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     ask.add(remoteRequest);
   }
 
-  private void
-      addResourceRequest(Priority priority, String resourceName,
-          Resource capability, T req, boolean relaxLocality,
-          String labelExpression) {
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    if (remoteRequests == null) {
-      remoteRequests = 
-          new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
-      this.remoteRequestsTable.put(priority, remoteRequests);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Added priority=" + priority);
-      }
-    }
-    TreeMap<Resource, ResourceRequestInfo> reqMap = 
-                                          remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      // capabilities are stored in reverse sorted order. smallest last.
-      reqMap = new TreeMap<Resource, ResourceRequestInfo>(
-          new ResourceReverseMemoryThenCpuComparator());
-      remoteRequests.put(resourceName, reqMap);
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-    if (resourceRequestInfo == null) {
-      resourceRequestInfo =
-          new ResourceRequestInfo(priority, resourceName, capability,
-              relaxLocality);
-      reqMap.put(capability, resourceRequestInfo);
-    }
-    
-    resourceRequestInfo.remoteRequest.setNumContainers(
-         resourceRequestInfo.remoteRequest.getNumContainers() + 1);
-
-    if (relaxLocality) {
-      resourceRequestInfo.containerRequests.add(req);
-    }
-    
-    if (ResourceRequest.ANY.equals(resourceName)) {
-      resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
-    }
+  private void addResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      boolean relaxLocality, String labelExpression) {
+    @SuppressWarnings("unchecked")
+    ResourceRequestInfo resourceRequestInfo = remoteRequestsTable
+        .addResourceRequest(priority, resourceName,
+        execTypeReq, capability, req, relaxLocality, labelExpression);
 
     // Note this down for next interaction with ResourceManager
     addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
@@ -800,70 +759,31 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     }
   }
 
-  private void decResourceRequest(Priority priority, 
-                                   String resourceName,
-                                   Resource capability, 
-                                   T req) {
-    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
-      this.remoteRequestsTable.get(priority);
-    
-    if(remoteRequests == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as priority " + priority 
-            + " is not present in request table");
-      }
-      return;
-    }
-    
-    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
-    if (reqMap == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Not decrementing resource as " + resourceName
-            + " is not present in request table");
-      }
-      return;
-    }
-    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
-          + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
-    }
-
-    resourceRequestInfo.remoteRequest.setNumContainers(
-        resourceRequestInfo.remoteRequest.getNumContainers() - 1);
-
-    resourceRequestInfo.containerRequests.remove(req);
-    
-    if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
-      // guard against spurious removals
-      resourceRequestInfo.remoteRequest.setNumContainers(0);
-    }
+  private void decResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+    @SuppressWarnings("unchecked")
+    ResourceRequestInfo resourceRequestInfo =
+        remoteRequestsTable.decResourceRequest(priority, resourceName,
+            execTypeReq, capability, req);
     // send the ResourceRequest to RM even if is 0 because it needs to override
     // a previously sent value. If ResourceRequest was not sent previously then
     // sending 0 aught to be a no-op on RM
-    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
+    if (resourceRequestInfo != null) {
+      addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 
-    // delete entries from map if no longer needed
-    if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
-      reqMap.remove(capability);
-      if (reqMap.size() == 0) {
-        remoteRequests.remove(resourceName);
+      // delete entry from map if no longer needed
+      if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
+        this.remoteRequestsTable.remove(priority, resourceName,
+            execTypeReq.getExecutionType(), capability);
       }
-      if (remoteRequests.size() == 0) {
-        remoteRequestsTable.remove(priority);
-      }
-    }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("AFTER decResourceRequest:" + " applicationId="
-          + " priority=" + priority.getPriority()
-          + " resourceName=" + resourceName + " numContainers="
-          + resourceRequestInfo.remoteRequest.getNumContainers() 
-          + " #asks=" + ask.size());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AFTER decResourceRequest:" + " applicationId="
+            + " priority=" + priority.getPriority()
+            + " resourceName=" + resourceName + " numContainers="
+            + resourceRequestInfo.remoteRequest.getNumContainers()
+            + " #asks=" + ask.size());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
new file mode 100644
index 0000000..853a512
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceReverseMemoryThenCpuComparator;
+
+class RemoteRequestsTable<T> implements Iterable<ResourceRequestInfo>{
+
+  private static final Log LOG = LogFactory.getLog(RemoteRequestsTable.class);
+
+  static ResourceReverseMemoryThenCpuComparator resourceComparator =
+      new ResourceReverseMemoryThenCpuComparator();
+
+  /**
+   * Nested Iterator that iterates over just the ResourceRequestInfo
+   * object.
+   */
+  class RequestInfoIterator implements Iterator<ResourceRequestInfo> {
+    private Iterator<Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>>> iLocMap;
+    private Iterator<Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> iExecTypeMap;
+    private Iterator<TreeMap<Resource, ResourceRequestInfo>> iCapMap;
+    private Iterator<ResourceRequestInfo> iResReqInfo;
+
+    public RequestInfoIterator(Iterator<Map<String,
+        Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>>>
+        iLocationMap) {
+      this.iLocMap = iLocationMap;
+      if (iLocMap.hasNext()) {
+        iExecTypeMap = iLocMap.next().values().iterator();
+      } else {
+        iExecTypeMap =
+            new LinkedList<Map<ExecutionType, TreeMap<Resource,
+                ResourceRequestInfo>>>().iterator();
+      }
+      if (iExecTypeMap.hasNext()) {
+        iCapMap = iExecTypeMap.next().values().iterator();
+      } else {
+        iCapMap =
+            new LinkedList<TreeMap<Resource, ResourceRequestInfo>>()
+                .iterator();
+      }
+      if (iCapMap.hasNext()) {
+        iResReqInfo = iCapMap.next().values().iterator();
+      } else {
+        iResReqInfo = new LinkedList<ResourceRequestInfo>().iterator();
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iLocMap.hasNext()
+          || iExecTypeMap.hasNext()
+          || iCapMap.hasNext()
+          || iResReqInfo.hasNext();
+    }
+
+    @Override
+    public ResourceRequestInfo next() {
+      if (!iResReqInfo.hasNext()) {
+        if (!iCapMap.hasNext()) {
+          if (!iExecTypeMap.hasNext()) {
+            iExecTypeMap = iLocMap.next().values().iterator();
+          }
+          iCapMap = iExecTypeMap.next().values().iterator();
+        }
+        iResReqInfo = iCapMap.next().values().iterator();
+      }
+      return iResReqInfo.next();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove is not supported" +
+          "for this iterator !!");
+    }
+  }
+
+  // Nest map with Primary key :
+  // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource)
+  // and value : ResourceRequestInfo
+  private Map<Priority, Map<String, Map<ExecutionType, TreeMap<Resource,
+      ResourceRequestInfo>>>> remoteRequestsTable = new HashMap<>();
+
+  @Override
+  public Iterator<ResourceRequestInfo> iterator() {
+    return new RequestInfoIterator(remoteRequestsTable.values().iterator());
+  }
+
+  ResourceRequestInfo get(Priority priority, String location,
+      ExecutionType execType, Resource capability) {
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        getCapabilityMap(priority, location, execType);
+    if (capabilityMap == null) {
+      return null;
+    }
+    return capabilityMap.get(capability);
+  }
+
+  void put(Priority priority, String resourceName, ExecutionType execType,
+      Resource capability, ResourceRequestInfo resReqInfo) {
+    Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> locationMap =
+        remoteRequestsTable.get(priority);
+    if (locationMap == null) {
+      locationMap = new HashMap<>();
+      this.remoteRequestsTable.put(priority, locationMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added priority=" + priority);
+      }
+    }
+    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>> execTypeMap =
+        locationMap.get(resourceName);
+    if (execTypeMap == null) {
+      execTypeMap = new HashMap<>();
+      locationMap.put(resourceName, execTypeMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added resourceName=" + resourceName);
+      }
+    }
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        execTypeMap.get(execType);
+    if (capabilityMap == null) {
+      capabilityMap = new TreeMap<>(resourceComparator);
+      execTypeMap.put(execType, capabilityMap);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added Execution Type=" + execType);
+      }
+    }
+    capabilityMap.put(capability, resReqInfo);
+  }
+
+  ResourceRequestInfo remove(Priority priority, String resourceName,
+      ExecutionType execType, Resource capability) {
+    ResourceRequestInfo retVal = null;
+    Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> locationMap = remoteRequestsTable.get(priority);
+    if (locationMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No such priority=" + priority);
+      }
+      return null;
+    }
+    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+        execTypeMap = locationMap.get(resourceName);
+    if (execTypeMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No such resourceName=" + resourceName);
+      }
+      return null;
+    }
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        execTypeMap.get(execType);
+    if (capabilityMap == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No such Execution Type=" + execType);
+      }
+      return null;
+    }
+    retVal = capabilityMap.remove(capability);
+    if (capabilityMap.size() == 0) {
+      execTypeMap.remove(execType);
+      if (execTypeMap.size() == 0) {
+        locationMap.remove(resourceName);
+        if (locationMap.size() == 0) {
+          this.remoteRequestsTable.remove(priority);
+        }
+      }
+    }
+    return retVal;
+  }
+
+  Map<String, Map<ExecutionType, TreeMap<Resource,
+      ResourceRequestInfo>>> getLocationMap(Priority priority) {
+    return remoteRequestsTable.get(priority);
+  }
+
+  Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+      getExecutionTypeMap(Priority priority, String location) {
+    Map<String, Map<ExecutionType, TreeMap<Resource,
+        ResourceRequestInfo>>> locationMap = getLocationMap(priority);
+    if (locationMap == null) {
+      return null;
+    }
+    return locationMap.get(location);
+  }
+
+  TreeMap<Resource, ResourceRequestInfo> getCapabilityMap(Priority
+      priority, String location,
+      ExecutionType execType) {
+    Map<ExecutionType, TreeMap<Resource, ResourceRequestInfo>>
+        executionTypeMap = getExecutionTypeMap(priority, location);
+    if (executionTypeMap == null) {
+      return null;
+    }
+    return executionTypeMap.get(execType);
+  }
+
+  @SuppressWarnings("unchecked")
+  List<ResourceRequestInfo> getAllResourceRequestInfos(Priority priority,
+      Collection<String> locations) {
+    List retList = new LinkedList<>();
+    for (String location : locations) {
+      for (ExecutionType eType : ExecutionType.values()) {
+        TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+            getCapabilityMap(priority, location, eType);
+        if (capabilityMap != null) {
+          retList.addAll(capabilityMap.values());
+        }
+      }
+    }
+    return retList;
+  }
+
+  List<ResourceRequestInfo> getMatchingRequests(
+      Priority priority, String resourceName, ExecutionType executionType,
+      Resource capability) {
+    List<ResourceRequestInfo> list = new LinkedList<>();
+    TreeMap<Resource, ResourceRequestInfo> capabilityMap =
+        getCapabilityMap(priority, resourceName, executionType);
+    if (capabilityMap != null) {
+      ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability);
+      if (resourceRequestInfo != null) {
+        list.add(resourceRequestInfo);
+      } else {
+        list.addAll(capabilityMap.tailMap(capability).values());
+      }
+    }
+    return list;
+  }
+
+  @SuppressWarnings("unchecked")
+  ResourceRequestInfo addResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req,
+      boolean relaxLocality, String labelExpression) {
+    ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
+        execTypeReq.getExecutionType(), capability);
+    if (resourceRequestInfo == null) {
+      resourceRequestInfo =
+          new ResourceRequestInfo(priority, resourceName, capability,
+              relaxLocality);
+      put(priority, resourceName, execTypeReq.getExecutionType(), capability,
+          resourceRequestInfo);
+    }
+    resourceRequestInfo.remoteRequest.setExecutionTypeRequest(execTypeReq);
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() + 1);
+
+    if (relaxLocality) {
+      resourceRequestInfo.containerRequests.add(req);
+    }
+
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
+    }
+    return resourceRequestInfo;
+  }
+
+  ResourceRequestInfo decResourceRequest(Priority priority, String resourceName,
+      ExecutionTypeRequest execTypeReq, Resource capability, T req) {
+    ResourceRequestInfo resourceRequestInfo = get(priority, resourceName,
+        execTypeReq.getExecutionType(), capability);
+
+    if (resourceRequestInfo == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not decrementing resource as ResourceRequestInfo with" +
+            "priority=" + priority + ", " +
+            "resourceName=" + resourceName + ", " +
+            "executionType=" + execTypeReq + ", " +
+            "capability=" + capability + " is not present in request table");
+      }
+      return null;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+          + " priority=" + priority.getPriority()
+          + " resourceName=" + resourceName + " numContainers="
+          + resourceRequestInfo.remoteRequest.getNumContainers());
+    }
+
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() - 1);
+
+    resourceRequestInfo.containerRequests.remove(req);
+
+    if (resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
+      // guard against spurious removals
+      resourceRequestInfo.remoteRequest.setNumContainers(0);
+    }
+    return resourceRequestInfo;
+  }
+
+  boolean isEmpty() {
+    return remoteRequestsTable.isEmpty();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
new file mode 100644
index 0000000..0b62054
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMProxyE2ETest.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base test case to be used for Testing frameworks that use AMRMProxy.
+ */
+public abstract class BaseAMRMProxyE2ETest {
+
+  protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
+      ApplicationId appId, MiniYARNCluster cluster,
+      final Configuration yarnConf)
+      throws IOException, InterruptedException, YarnException {
+
+    UserGroupInformation user = null;
+
+    // Get the AMRMToken from AMRMProxy
+
+    ApplicationReport report = rmClient.getApplicationReport(appId);
+
+    user = UserGroupInformation.createProxyUser(
+        report.getCurrentApplicationAttemptId().toString(),
+        UserGroupInformation.getCurrentUser());
+
+    ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
+        .getNodeManager(0).getNMContext().getContainerManager();
+
+    AMRMProxyTokenSecretManager amrmTokenSecretManager =
+        containerManager.getAMRMProxyService().getSecretManager();
+    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
+        amrmTokenSecretManager
+            .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
+
+    SecurityUtil.setTokenService(token,
+        containerManager.getAMRMProxyService().getBindAddress());
+    user.addToken(token);
+
+    // Start Application Master
+
+    return user
+        .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
+          @Override
+          public ApplicationMasterProtocol run() throws Exception {
+            return ClientRMProxy.createRMProxy(yarnConf,
+                ApplicationMasterProtocol.class);
+          }
+        });
+  }
+
+  protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
+    // The test needs AMRMClient to create a real allocate request
+    AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+        new AMRMClientImpl<>();
+
+    Resource capability = Resource.newInstance(1024, 2);
+    Priority priority = Priority.newInstance(1);
+    List<NodeReport> nodeReports = listNode;
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String[] nodes = new String[] {node};
+
+    AMRMClient.ContainerRequest storedContainer1 =
+        new AMRMClient.ContainerRequest(capability, nodes, null, priority);
+    amClient.addContainerRequest(storedContainer1);
+    amClient.addContainerRequest(storedContainer1);
+
+    List<ResourceRequest> resourceAsk = new ArrayList<>();
+    for (ResourceRequest rr : amClient.ask) {
+      resourceAsk.add(rr);
+    }
+
+    ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
+        .newInstance(new ArrayList<>(), new ArrayList<>());
+
+    int responseId = 1;
+
+    return AllocateRequest.newInstance(responseId, 0, resourceAsk,
+        new ArrayList<>(), resourceBlacklistRequest);
+  }
+
+  protected ApplicationAttemptId createApp(YarnClient yarnClient,
+      MiniYARNCluster yarnCluster, Configuration conf) throws Exception {
+
+    ApplicationSubmissionContext appContext =
+        yarnClient.createApplication().getApplicationSubmissionContext();
+    ApplicationId appId = appContext.getApplicationId();
+
+    appContext.setApplicationName("Test");
+
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(0);
+    appContext.setPriority(pri);
+
+    appContext.setQueue("default");
+
+    ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource> emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
+        new HashMap<String, ByteBuffer>(), null,
+        new HashMap<ApplicationAccessType, String>());
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(Resource.newInstance(1024, 1));
+
+    SubmitApplicationRequest appRequest =
+        Records.newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+
+    yarnClient.submitApplication(appContext);
+
+    RMAppAttempt appAttempt = null;
+    ApplicationAttemptId attemptId = null;
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport
+          .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+        attemptId =
+            appReport.getCurrentApplicationAttemptId();
+        appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
+            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
+        while (true) {
+          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
+            break;
+          }
+        }
+        break;
+      }
+    }
+    Thread.sleep(1000);
+    // Just dig into the ResourceManager and get the AMRMToken just for the sake
+    // of testing.
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+
+    // emulate RM setup of AMRM token in credentials by adding the token
+    // *before* setting the token service
+    UserGroupInformation.getCurrentUser().addToken(appAttempt.getAMRMToken());
+    appAttempt.getAMRMToken().setService(
+        ClientRMProxy.getAMRMTokenService(conf));
+    return attemptId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 75b49d0..99bfca5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -413,11 +414,13 @@ public class TestAMRMClient {
       amClient.addContainerRequest(storedContainer3);
       
       // test addition and storage
-      int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
-       .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+      int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          .remoteRequest.getNumContainers();
       assertEquals(2, containersRequestedAny);
-      containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
-          .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+      containersRequestedAny = amClient.remoteRequestsTable.get(priority1,
+          ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+          .remoteRequest.getNumContainers();
          assertEquals(1, containersRequestedAny);
       List<? extends Collection<ContainerRequest>> matches = 
           amClient.getMatchingRequests(priority, node, capability);
@@ -919,12 +922,15 @@ public class TestAMRMClient {
     amClient.removeContainerRequest(
         new ContainerRequest(capability, nodes, racks, priority));
     
-    int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
-        .get(node).get(capability).remoteRequest.getNumContainers();
-    int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
-        .get(rack).get(capability).remoteRequest.getNumContainers();
-    int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
-    .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+    int containersRequestedNode = amClient.remoteRequestsTable.get(priority,
+        node, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedRack = amClient.remoteRequestsTable.get(priority,
+        rack, ExecutionType.GUARANTEED, capability).remoteRequest
+        .getNumContainers();
+    int containersRequestedAny = amClient.remoteRequestsTable.get(priority,
+        ResourceRequest.ANY, ExecutionType.GUARANTEED, capability)
+        .remoteRequest.getNumContainers();
 
     assertEquals(2, containersRequestedNode);
     assertEquals(2, containersRequestedRack);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
index cb8c86a..2db33c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
@@ -26,6 +26,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -35,6 +37,46 @@ import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
 import org.junit.Test;
 
 public class TestAMRMClientContainerRequest {
+
+  @Test
+  public void testOpportunisticAndGuaranteedRequests() {
+    AMRMClientImpl<ContainerRequest> client =
+        new AMRMClientImpl<ContainerRequest>();
+
+    Configuration conf = new Configuration();
+    conf.setClass(
+        CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        MyResolver.class, DNSToSwitchMapping.class);
+    client.init(conf);
+
+    Resource capability = Resource.newInstance(1024, 1);
+    ContainerRequest request =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1));
+    client.addContainerRequest(request);
+    verifyResourceRequest(client, request, "host1", true);
+    verifyResourceRequest(client, request, "host2", true);
+    verifyResourceRequest(client, request, "/rack1", true);
+    verifyResourceRequest(client, request, "/rack2", true);
+    verifyResourceRequest(client, request, ResourceRequest.ANY, true);
+    ContainerRequest request2 =
+        new ContainerRequest(capability, new String[] {"host1", "host2"},
+            new String[] {"/rack2"}, Priority.newInstance(1), true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true));
+    client.addContainerRequest(request2);
+    verifyResourceRequest(client, request, "host1", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, "host2", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, "/rack1", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, "/rack2", true,
+        ExecutionType.OPPORTUNISTIC);
+    verifyResourceRequest(client, request, ResourceRequest.ANY, true,
+        ExecutionType.OPPORTUNISTIC);
+  }
+
   @Test
   public void testFillInRacks() {
     AMRMClientImpl<ContainerRequest> client =
@@ -224,8 +266,16 @@ public class TestAMRMClientContainerRequest {
   private void verifyResourceRequest(
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
       String location, boolean expectedRelaxLocality) {
-    ResourceRequest ask =  client.remoteRequestsTable.get(request.getPriority())
-        .get(location).get(request.getCapability()).remoteRequest;
+    verifyResourceRequest(client, request, location, expectedRelaxLocality,
+        ExecutionType.GUARANTEED);
+  }
+
+  private void verifyResourceRequest(
+      AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
+      String location, boolean expectedRelaxLocality,
+      ExecutionType executionType) {
+    ResourceRequest ask = client.remoteRequestsTable.get(request.getPriority(),
+        location, executionType, request.getCapability()).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(1, ask.getNumContainers());
     assertEquals(expectedRelaxLocality, ask.getRelaxLocality());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/51432779/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
index f1e3f03..33f7527 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMProxy.java
@@ -19,20 +19,12 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -40,43 +32,25 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyTokenSecretManager;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class TestAMRMProxy {
+/**
+ * End-to-End test cases for the AMRMProxy Service.
+ */
+public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
 
   private static final Log LOG = LogFactory.getLog(TestAMRMProxy.class);
 
@@ -84,7 +58,7 @@ public class TestAMRMProxy {
    * This test validates register, allocate and finish of an application through
    * the AMRMPRoxy.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 120000)
   public void testAMRMProxyE2E() throws Exception {
     MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
     YarnClient rmClient = null;
@@ -107,7 +81,8 @@ public class TestAMRMProxy {
 
       // Submit application
 
-      ApplicationId appId = createApp(rmClient, cluster);
+      ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+      ApplicationId appId = appAttmptId.getApplicationId();
 
       client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
 
@@ -173,7 +148,7 @@ public class TestAMRMProxy {
    * that the received token it is different from the previous one within 5
    * requests.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 120000)
   public void testE2ETokenRenewal() throws Exception {
     MiniYARNCluster cluster =
         new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
@@ -201,7 +176,8 @@ public class TestAMRMProxy {
 
       // Submit
 
-      ApplicationId appId = createApp(rmClient, cluster);
+      ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+      ApplicationId appId = appAttmptId.getApplicationId();
 
       client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
 
@@ -252,7 +228,7 @@ public class TestAMRMProxy {
    * This test validates that an AM cannot register directly to the RM, with the
    * token provided by the AMRMProxy.
    */
-  @Test(timeout = 60000)
+  @Test(timeout = 120000)
   public void testE2ETokenSwap() throws Exception {
     MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
     YarnClient rmClient = null;
@@ -270,7 +246,8 @@ public class TestAMRMProxy {
       rmClient.init(yarnConf);
       rmClient.start();
 
-      ApplicationId appId = createApp(rmClient, cluster);
+      ApplicationAttemptId appAttmptId = createApp(rmClient, cluster, conf);
+      ApplicationId appId = appAttmptId.getApplicationId();
 
       client = createAMRMProtocol(rmClient, appId, cluster, yarnConf);
 
@@ -290,124 +267,4 @@ public class TestAMRMProxy {
       cluster.stop();
     }
   }
-
-  protected ApplicationMasterProtocol createAMRMProtocol(YarnClient rmClient,
-      ApplicationId appId, MiniYARNCluster cluster,
-      final Configuration yarnConf)
-          throws IOException, InterruptedException, YarnException {
-
-    UserGroupInformation user = null;
-
-    // Get the AMRMToken from AMRMProxy
-
-    ApplicationReport report = rmClient.getApplicationReport(appId);
-
-    user = UserGroupInformation.createProxyUser(
-        report.getCurrentApplicationAttemptId().toString(),
-        UserGroupInformation.getCurrentUser());
-
-    ContainerManagerImpl containerManager = (ContainerManagerImpl) cluster
-        .getNodeManager(0).getNMContext().getContainerManager();
-
-    AMRMProxyTokenSecretManager amrmTokenSecretManager =
-        containerManager.getAMRMProxyService().getSecretManager();
-    org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> token =
-        amrmTokenSecretManager
-            .createAndGetAMRMToken(report.getCurrentApplicationAttemptId());
-
-    SecurityUtil.setTokenService(token,
-        containerManager.getAMRMProxyService().getBindAddress());
-    user.addToken(token);
-
-    // Start Application Master
-
-    return user
-        .doAs(new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
-          @Override
-          public ApplicationMasterProtocol run() throws Exception {
-            return ClientRMProxy.createRMProxy(yarnConf,
-                ApplicationMasterProtocol.class);
-          }
-        });
-  }
-
-  protected AllocateRequest createAllocateRequest(List<NodeReport> listNode) {
-    // The test needs AMRMClient to create a real allocate request
-    AMRMClientImpl<ContainerRequest> amClient =
-        new AMRMClientImpl<ContainerRequest>();
-
-    Resource capability = Resource.newInstance(1024, 2);
-    Priority priority = Priority.newInstance(1);
-    List<NodeReport> nodeReports = listNode;
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String[] nodes = new String[] { node };
-
-    ContainerRequest storedContainer1 =
-        new ContainerRequest(capability, nodes, null, priority);
-    amClient.addContainerRequest(storedContainer1);
-    amClient.addContainerRequest(storedContainer1);
-
-    List<ResourceRequest> resourceAsk = new ArrayList<ResourceRequest>();
-    for (ResourceRequest rr : amClient.ask) {
-      resourceAsk.add(rr);
-    }
-
-    ResourceBlacklistRequest resourceBlacklistRequest = ResourceBlacklistRequest
-        .newInstance(new ArrayList<String>(), new ArrayList<String>());
-
-    int responseId = 1;
-
-    return AllocateRequest.newInstance(responseId, 0, resourceAsk,
-        new ArrayList<ContainerId>(), resourceBlacklistRequest);
-  }
-
-  protected ApplicationId createApp(YarnClient yarnClient,
-      MiniYARNCluster yarnCluster) throws Exception {
-
-    ApplicationSubmissionContext appContext =
-        yarnClient.createApplication().getApplicationSubmissionContext();
-    ApplicationId appId = appContext.getApplicationId();
-
-    appContext.setApplicationName("Test");
-
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(0);
-    appContext.setPriority(pri);
-
-    appContext.setQueue("default");
-
-    ContainerLaunchContext amContainer = BuilderUtils.newContainerLaunchContext(
-        Collections.<String, LocalResource> emptyMap(),
-        new HashMap<String, String>(), Arrays.asList("sleep", "10000"),
-        new HashMap<String, ByteBuffer>(), null,
-        new HashMap<ApplicationAccessType, String>());
-    appContext.setAMContainerSpec(amContainer);
-    appContext.setResource(Resource.newInstance(1024, 1));
-
-    SubmitApplicationRequest appRequest =
-        Records.newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-
-    yarnClient.submitApplication(appContext);
-
-    RMAppAttempt appAttempt = null;
-    while (true) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport
-          .getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
-        ApplicationAttemptId attemptId =
-            appReport.getCurrentApplicationAttemptId();
-        appAttempt = yarnCluster.getResourceManager().getRMContext().getRMApps()
-            .get(attemptId.getApplicationId()).getCurrentAppAttempt();
-        while (true) {
-          if (appAttempt.getAppAttemptState() == RMAppAttemptState.LAUNCHED) {
-            break;
-          }
-        }
-        break;
-      }
-    }
-    Thread.sleep(1000);
-    return appId;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org