You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/06/01 10:31:50 UTC

svn commit: r1488487 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yar...

Author: bikas
Date: Sat Jun  1 08:31:50 2013
New Revision: 1488487

URL: http://svn.apache.org/r1488487
Log:
Merge r1488485 from trunk for YARN-660. Improve AMRMClient with matching requests (bikas)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jun  1 08:31:50 2013
@@ -219,6 +219,8 @@ Release 2.0.5-beta - UNRELEASED
     YARN-638. Modified ResourceManager to restore RMDelegationTokens after
     restarting. (Jian He via vinodkv)
 
+    YARN-660. Improve AMRMClient with matching requests (bikas)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Sat Jun  1 08:31:50 2013
@@ -151,7 +151,7 @@ public class ApplicationMaster {
   private YarnRPC rpc;
 
   // Handle to communicate with the Resource Manager
-  private AMRMClientAsync resourceManager;
+  private AMRMClientAsync<ContainerRequest> resourceManager;
   
   // Application Attempt Id ( combination of attemptId and fail count )
   private ApplicationAttemptId appAttemptID;
@@ -442,7 +442,9 @@ public class ApplicationMaster {
 
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
     
-    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
+    resourceManager = new AMRMClientAsync<ContainerRequest>(appAttemptID, 
+                                                            1000, 
+                                                            allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
@@ -522,7 +524,8 @@ public class ApplicationMaster {
     FinalApplicationStatus appStatus;
     String appMessage = null;
     success = true;
-    if (numFailedContainers.get() == 0) {
+    if (numFailedContainers.get() == 0 && 
+        numCompletedContainers.get() == numTotalContainers) {
       appStatus = FinalApplicationStatus.SUCCEEDED;
     } else {
       appStatus = FinalApplicationStatus.FAILED;
@@ -594,11 +597,6 @@ public class ApplicationMaster {
         resourceManager.addContainerRequest(containerAsk);
       }
       
-      // set progress to deliver to RM on next heartbeat
-      float progress = (float) numCompletedContainers.get()
-          / numTotalContainers;
-      resourceManager.setProgress(progress);
-      
       if (numCompletedContainers.get() == numTotalContainers) {
         done = true;
       }
@@ -637,6 +635,19 @@ public class ApplicationMaster {
 
     @Override
     public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+    @Override
+    public float getProgress() {
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      return progress;
+    }
+
+    @Override
+    public void onError(Exception e) {
+      done = true;
+    }
   }
 
   /**

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sat Jun  1 08:31:50 2013
@@ -18,8 +18,9 @@
 
 package org.apache.hadoop.yarn.client;
 
-
 import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -32,9 +33,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.service.Service;
 
+import com.google.common.collect.ImmutableList;
+
 @InterfaceAudience.Public
 @InterfaceStability.Unstable
-public interface AMRMClient extends Service {
+public interface AMRMClient<T extends AMRMClient.ContainerRequest> extends Service {
 
   /**
    * Object to represent container request for resources.
@@ -43,20 +46,41 @@ public interface AMRMClient extends Serv
    * Can ask for multiple containers of a given type.
    */
   public static class ContainerRequest {
-    Resource capability;
-    String[] hosts;
-    String[] racks;
-    Priority priority;
-    int containerCount;
+    final Resource capability;
+    final ImmutableList<String> hosts;
+    final ImmutableList<String> racks;
+    final Priority priority;
+    final int containerCount;
         
     public ContainerRequest(Resource capability, String[] hosts,
         String[] racks, Priority priority, int containerCount) {
       this.capability = capability;
-      this.hosts = (hosts != null ? hosts.clone() : null);
-      this.racks = (racks != null ? racks.clone() : null);
+      this.hosts = (hosts != null ? ImmutableList.copyOf(hosts) : null);
+      this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
       this.priority = priority;
       this.containerCount = containerCount;
     }
+    
+    public Resource getCapability() {
+      return capability;
+    }
+    
+    public ImmutableList<String> getHosts() {
+      return hosts;
+    }
+    
+    public ImmutableList<String> getRacks() {
+      return racks;
+    }
+    
+    public Priority getPriority() {
+      return priority;
+    }
+    
+    public int getContainerCount() {
+      return containerCount;
+    }
+    
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("Capability[").append(capability).append("]");
@@ -65,6 +89,22 @@ public interface AMRMClient extends Serv
       return sb.toString();
     }
   }
+ 
+  /**
+   * This creates a <code>ContainerRequest</code> for 1 container and the
+   * AMRMClient stores this request internally. <code>getMatchingRequests</code>
+   * can be used to retrieve these requests from AMRMClient. These requests may 
+   * be matched with an allocated container to determine which request to assign
+   * the container to. <code>removeContainerRequest</code> must be called using 
+   * the same assigned <code>StoredContainerRequest</code> object so that 
+   * AMRMClient can remove it from its internal store.
+   */
+  public static class StoredContainerRequest extends ContainerRequest {    
+    public StoredContainerRequest(Resource capability, String[] hosts,
+        String[] racks, Priority priority) {
+      super(capability, hosts, racks, priority, 1);
+    }
+  }
   
   /**
    * Register the application master. This must be called before any 
@@ -117,7 +157,7 @@ public interface AMRMClient extends Serv
    * Request containers for resources before calling <code>allocate</code>
    * @param req Resource request
    */
-  public void addContainerRequest(ContainerRequest req);
+  public void addContainerRequest(T req);
   
   /**
    * Remove previous container request. The previous container request may have 
@@ -126,7 +166,7 @@ public interface AMRMClient extends Serv
    * even after the remove request
    * @param req Resource request
    */
-  public void removeContainerRequest(ContainerRequest req);
+  public void removeContainerRequest(T req);
   
   /**
    * Release containers assigned by the Resource Manager. If the app cannot use
@@ -150,4 +190,21 @@ public interface AMRMClient extends Serv
    * @return Current number of nodes in the cluster
    */
   public int getClusterNodeCount();
+
+  /**
+   * Get outstanding <code>StoredContainerRequest</code>s matching the given 
+   * parameters. These StoredContainerRequests should have been added via
+   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+   * the AMRMClient may return its internal collection directly without creating 
+   * a copy. Users should not perform mutable operations on the return value.
+   * Each collection in the list contains requests with identical 
+   * <code>Resource</code> size that fit in the given capability. In a 
+   * collection, requests will be returned in the same order as they were added.
+   * @return Collection of request matching the parameters
+   */
+  public List<? extends Collection<T>> getMatchingRequests(
+                                           Priority priority, 
+                                           String resourceName, 
+                                           Resource capability);
+
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Sat Jun  1 08:31:50 2013
@@ -19,9 +19,11 @@
 package org.apache.hadoop.yarn.client;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +40,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -88,55 +92,50 @@ import com.google.common.annotations.Vis
  */
 @Unstable
 @Evolving
-public class AMRMClientAsync extends AbstractService {
+public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
   
   private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
   
-  private final AMRMClient client;
-  private final int intervalMs;
+  private final AMRMClient<T> client;
+  private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
   private final HeartbeatThread heartbeatThread;
   private final CallbackHandlerThread handlerThread;
   private final CallbackHandler handler;
 
   private final BlockingQueue<AllocateResponse> responseQueue;
   
+  private final Object unregisterHeartbeatLock = new Object();
+  
   private volatile boolean keepRunning;
   private volatile float progress;
   
+  private volatile Exception savedException;
+  
   public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
       CallbackHandler callbackHandler) {
-    this(new AMRMClientImpl(id), intervalMs, callbackHandler);
+    this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
   }
   
   @Private
   @VisibleForTesting
-  AMRMClientAsync(AMRMClient client, int intervalMs,
+  public AMRMClientAsync(AMRMClient<T> client, int intervalMs,
       CallbackHandler callbackHandler) {
     super(AMRMClientAsync.class.getName());
     this.client = client;
-    this.intervalMs = intervalMs;
+    this.heartbeatIntervalMs.set(intervalMs);
     handler = callbackHandler;
     heartbeatThread = new HeartbeatThread();
     handlerThread = new CallbackHandlerThread();
     responseQueue = new LinkedBlockingQueue<AllocateResponse>();
     keepRunning = true;
+    savedException = null;
   }
-  
-  /**
-   * Sets the application's current progress. It will be transmitted to the
-   * resource manager on the next heartbeat.
-   * @param progress
-   *    the application's progress so far
-   */
-  public void setProgress(float progress) {
-    this.progress = progress;
-  }
-  
+    
   @Override
   public void init(Configuration conf) {
     super.init(conf);
     client.init(conf);
-  }
+  }  
   
   @Override
   public void start() {
@@ -171,6 +170,17 @@ public class AMRMClientAsync extends Abs
     super.stop();
   }
   
+  public void setHeartbeatInterval(int interval) {
+    heartbeatIntervalMs.set(interval);
+  }
+  
+  public List<? extends Collection<T>> getMatchingRequests(
+                                                   Priority priority, 
+                                                   String resourceName, 
+                                                   Resource capability) {
+    return client.getMatchingRequests(priority, resourceName, capability);
+  }
+  
   /**
    * Registers this application master with the resource manager. On successful
    * registration, starts the heartbeating thread.
@@ -180,8 +190,8 @@ public class AMRMClientAsync extends Abs
   public RegisterApplicationMasterResponse registerApplicationMaster(
       String appHostName, int appHostPort, String appTrackingUrl)
       throws YarnRemoteException, IOException {
-    RegisterApplicationMasterResponse response =
-        client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+    RegisterApplicationMasterResponse response = client
+        .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
     heartbeatThread.start();
     return response;
   }
@@ -195,8 +205,9 @@ public class AMRMClientAsync extends Abs
    * @throws IOException
    */
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
-      String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException {
-    synchronized (client) {
+      String appMessage, String appTrackingUrl) throws YarnRemoteException,
+      IOException {
+    synchronized (unregisterHeartbeatLock) {
       keepRunning = false;
       client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
     }
@@ -206,7 +217,7 @@ public class AMRMClientAsync extends Abs
    * Request containers for resources before calling <code>allocate</code>
    * @param req Resource request
    */
-  public void addContainerRequest(AMRMClient.ContainerRequest req) {
+  public void addContainerRequest(T req) {
     client.addContainerRequest(req);
   }
 
@@ -217,7 +228,7 @@ public class AMRMClientAsync extends Abs
    * even after the remove request
    * @param req Resource request
    */
-  public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+  public void removeContainerRequest(T req) {
     client.removeContainerRequest(req);
   }
 
@@ -259,7 +270,7 @@ public class AMRMClientAsync extends Abs
       while (true) {
         AllocateResponse response = null;
         // synchronization ensures we don't send heartbeats after unregistering
-        synchronized (client) {
+        synchronized (unregisterHeartbeatLock) {
           if (!keepRunning) {
             break;
           }
@@ -267,9 +278,17 @@ public class AMRMClientAsync extends Abs
           try {
             response = client.allocate(progress);
           } catch (YarnRemoteException ex) {
-            LOG.error("Failed to heartbeat", ex);
+            LOG.error("Yarn exception on heartbeat", ex);
+            savedException = ex;
+            // interrupt handler thread in case it waiting on the queue
+            handlerThread.interrupt();
+            break;
           } catch (IOException e) {
-            LOG.error("Failed to heartbeat", e);
+            LOG.error("IO exception on heartbeat", e);
+            savedException = e;
+            // interrupt handler thread in case it waiting on the queue
+            handlerThread.interrupt();
+            break;
           }
         }
         if (response != null) {
@@ -278,15 +297,15 @@ public class AMRMClientAsync extends Abs
               responseQueue.put(response);
               break;
             } catch (InterruptedException ex) {
-              LOG.warn("Interrupted while waiting to put on response queue", ex);
+              LOG.info("Interrupted while waiting to put on response queue", ex);
             }
           }
         }
         
         try {
-          Thread.sleep(intervalMs);
+          Thread.sleep(heartbeatIntervalMs.get());
         } catch (InterruptedException ex) {
-          LOG.warn("Heartbeater interrupted", ex);
+          LOG.info("Heartbeater interrupted", ex);
         }
       }
     }
@@ -301,14 +320,21 @@ public class AMRMClientAsync extends Abs
       while (keepRunning) {
         AllocateResponse response;
         try {
+          if(savedException != null) {
+            LOG.error("Stopping callback due to: ", savedException);
+            handler.onError(savedException);
+            break;
+          }
           response = responseQueue.take();
         } catch (InterruptedException ex) {
-          LOG.info("Interrupted while waiting for queue");
+          LOG.info("Interrupted while waiting for queue", ex);
           continue;
         }
 
         if (response.getReboot()) {
           handler.onRebootRequest();
+          LOG.info("Reboot requested. Stopping callback.");
+          break;
         }
         List<NodeReport> updatedNodes = response.getUpdatedNodes();
         if (!updatedNodes.isEmpty()) {
@@ -325,6 +351,8 @@ public class AMRMClientAsync extends Abs
         if (!allocated.isEmpty()) {
           handler.onContainersAllocated(allocated);
         }
+        
+        progress = handler.getProgress();
       }
     }
   }
@@ -347,14 +375,19 @@ public class AMRMClientAsync extends Abs
     
     /**
      * Called when the ResourceManager wants the ApplicationMaster to reboot
-     * for being out of sync.
+     * for being out of sync. The ApplicationMaster should not unregister with 
+     * the RM unless the ApplicationMaster wants to be the last attempt.
      */
     public void onRebootRequest();
     
     /**
-     * Called when nodes tracked by the ResourceManager have changed in in health,
+     * Called when nodes tracked by the ResourceManager have changed in health,
      * availability etc.
      */
     public void onNodesUpdated(List<NodeReport> updatedNodes);
+    
+    public float getProgress();
+    
+    public void onError(Exception e);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sat Jun  1 08:31:50 2013
@@ -22,9 +22,15 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
@@ -47,6 +53,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -55,8 +62,11 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 
+// TODO check inputs for null etc. YARN-654
+
 @Unstable
-public class AMRMClientImpl extends AbstractService implements AMRMClient {
+public class AMRMClientImpl<T extends ContainerRequest> 
+                          extends AbstractService implements AMRMClient<T> {
 
   private static final Log LOG = LogFactory.getLog(AMRMClientImpl.class);
   
@@ -70,6 +80,57 @@ public class AMRMClientImpl extends Abst
   protected Resource clusterAvailableResources;
   protected int clusterNodeCount;
   
+  class ResourceRequestInfo {
+    ResourceRequest remoteRequest;
+    LinkedHashSet<T> containerRequests;
+    
+    ResourceRequestInfo(Priority priority, String resourceName,
+        Resource capability) {
+      remoteRequest = BuilderUtils.newResourceRequest(priority, resourceName,
+          capability, 0);
+      containerRequests = new LinkedHashSet<T>();
+    }
+  }
+  
+  
+  /**
+   * Class compares Resource by memory then cpu in reverse order
+   */
+  class ResourceReverseMemoryThenCpuComparator implements Comparator<Resource> {
+    @Override
+    public int compare(Resource arg0, Resource arg1) {
+      int mem0 = arg0.getMemory();
+      int mem1 = arg1.getMemory();
+      int cpu0 = arg0.getVirtualCores();
+      int cpu1 = arg1.getVirtualCores();
+      if(mem0 == mem1) {
+        if(cpu0 == cpu1) {
+          return 0;
+        }
+        if(cpu0 < cpu1) {
+          return 1;
+        }
+        return -1;
+      }
+      if(mem0 < mem1) { 
+        return 1;
+      }
+      return -1;
+    }    
+  }
+  
+  static boolean canFit(Resource arg0, Resource arg1) {
+    int mem0 = arg0.getMemory();
+    int mem1 = arg1.getMemory();
+    int cpu0 = arg0.getVirtualCores();
+    int cpu1 = arg1.getVirtualCores();
+    
+    if(mem0 <= mem1 && cpu0 <= cpu1) { 
+      return true;
+    }
+    return false; 
+  }
+  
   //Key -> Priority
   //Value -> Map
   //Key->ResourceName (e.g., hostname, rackname, *)
@@ -77,9 +138,9 @@ public class AMRMClientImpl extends Abst
   //Key->Resource Capability
   //Value->ResourceRequest
   protected final 
-  Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
+  Map<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>
     remoteRequestsTable =
-    new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
+    new TreeMap<Priority, Map<String, TreeMap<Resource, ResourceRequestInfo>>>();
 
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
@@ -223,42 +284,47 @@ public class AMRMClientImpl extends Abst
   }
   
   @Override
-  public synchronized void addContainerRequest(ContainerRequest req) {
+  public synchronized void addContainerRequest(T req) {
     // Create resource requests
-    if(req.hosts != null) {
+    // add check for dup locations
+    if (req.hosts != null) {
       for (String host : req.hosts) {
-        addResourceRequest(req.priority, host, req.capability, req.containerCount);
+        addResourceRequest(req.priority, host, req.capability,
+            req.containerCount, req);
       }
     }
 
-    if(req.racks != null) {
+    if (req.racks != null) {
       for (String rack : req.racks) {
-        addResourceRequest(req.priority, rack, req.capability, req.containerCount);
+        addResourceRequest(req.priority, rack, req.capability,
+            req.containerCount, req);
       }
     }
 
     // Off-switch
     addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
-        req.containerCount);
+        req.containerCount, req);
   }
 
   @Override
-  public synchronized void removeContainerRequest(ContainerRequest req) {
+  public synchronized void removeContainerRequest(T req) {
     // Update resource requests
-    if(req.hosts != null) {
+    if (req.hosts != null) {
       for (String hostName : req.hosts) {
-        decResourceRequest(req.priority, hostName, req.capability, req.containerCount);
+        decResourceRequest(req.priority, hostName, req.capability,
+            req.containerCount, req);
       }
     }
-    
-    if(req.racks != null) {
+
+    if (req.racks != null) {
       for (String rack : req.racks) {
-        decResourceRequest(req.priority, rack, req.capability, req.containerCount);
+        decResourceRequest(req.priority, rack, req.capability,
+            req.containerCount, req);
       }
     }
-   
+
     decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
-        req.containerCount);
+        req.containerCount, req);
   }
 
   @Override
@@ -276,6 +342,44 @@ public class AMRMClientImpl extends Abst
     return clusterNodeCount;
   }
   
+  @Override
+  public synchronized List<? extends Collection<T>> getMatchingRequests(
+                                          Priority priority, 
+                                          String resourceName, 
+                                          Resource capability) {
+    List<LinkedHashSet<T>> list = new LinkedList<LinkedHashSet<T>>();
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests = 
+        this.remoteRequestsTable.get(priority);
+    if (remoteRequests == null) {
+      return list;
+    }
+    TreeMap<Resource, ResourceRequestInfo> reqMap = remoteRequests
+        .get(resourceName);
+    if (reqMap == null) {
+      return list;
+    }
+
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+    if (resourceRequestInfo != null) {
+      list.add(resourceRequestInfo.containerRequests);
+      return list;
+    }
+    
+    // no exact match. Container may be larger than what was requested.
+    // get all resources <= capability. map is reverse sorted. 
+    SortedMap<Resource, ResourceRequestInfo> tailMap = 
+                                                  reqMap.tailMap(capability);
+    for(Map.Entry<Resource, ResourceRequestInfo> entry : tailMap.entrySet()) {
+      if(canFit(entry.getKey(), capability)) {
+        // match found that fits in the larger resource
+        list.add(entry.getValue().containerRequests);
+      }
+    }
+    
+    // no match found
+    return list;          
+  }
+  
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // This code looks weird but is needed because of the following scenario.
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 
@@ -294,44 +398,57 @@ public class AMRMClientImpl extends Abst
   }
 
   private void addResourceRequest(Priority priority, String resourceName,
-      Resource capability, int containerCount) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+      Resource capability, int containerCount, T req) {
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
-      remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
+      remoteRequests = 
+          new HashMap<String, TreeMap<Resource, ResourceRequestInfo>>();
       this.remoteRequestsTable.put(priority, remoteRequests);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Added priority=" + priority);
       }
     }
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    TreeMap<Resource, ResourceRequestInfo> reqMap = 
+                                          remoteRequests.get(resourceName);
     if (reqMap == null) {
-      reqMap = new HashMap<Resource, ResourceRequest>();
+      // capabilities are stored in reverse sorted order. smallest last.
+      reqMap = new TreeMap<Resource, ResourceRequestInfo>(
+          new ResourceReverseMemoryThenCpuComparator());
       remoteRequests.put(resourceName, reqMap);
     }
-    ResourceRequest remoteRequest = reqMap.get(capability);
-    if (remoteRequest == null) {
-      remoteRequest = BuilderUtils.
-          newResourceRequest(priority, resourceName, capability, 0);
-      reqMap.put(capability, remoteRequest);
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
+    if (resourceRequestInfo == null) {
+      resourceRequestInfo =
+          new ResourceRequestInfo(priority, resourceName, capability);
+      reqMap.put(capability, resourceRequestInfo);
     }
     
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + containerCount);
+    resourceRequestInfo.remoteRequest.setNumContainers(
+         resourceRequestInfo.remoteRequest.getNumContainers() + containerCount);
+
+    if(req instanceof StoredContainerRequest) {
+      resourceRequestInfo.containerRequests.add(req);
+    }
 
     // Note this down for next interaction with ResourceManager
-    addResourceRequestToAsk(remoteRequest);
+    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("addResourceRequest:" + " applicationId="
           + appAttemptId + " priority=" + priority.getPriority()
           + " resourceName=" + resourceName + " numContainers="
-          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
     }
   }
 
-  private void decResourceRequest(Priority priority, String resourceName,
-      Resource capability, int containerCount) {
-    Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+  private void decResourceRequest(Priority priority, 
+                                   String resourceName,
+                                   Resource capability, 
+                                   int containerCount, 
+                                   T req) {
+    Map<String, TreeMap<Resource, ResourceRequestInfo>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     
     if(remoteRequests == null) {
@@ -342,7 +459,7 @@ public class AMRMClientImpl extends Abst
       return;
     }
     
-    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+    Map<Resource, ResourceRequestInfo> reqMap = remoteRequests.get(resourceName);
     if (reqMap == null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Not decrementing resource as " + resourceName
@@ -350,28 +467,34 @@ public class AMRMClientImpl extends Abst
       }
       return;
     }
-    ResourceRequest remoteRequest = reqMap.get(capability);
+    ResourceRequestInfo resourceRequestInfo = reqMap.get(capability);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("BEFORE decResourceRequest:" + " applicationId="
           + appAttemptId + " priority=" + priority.getPriority()
           + " resourceName=" + resourceName + " numContainers="
-          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
     }
 
-    remoteRequest.
-        setNumContainers(remoteRequest.getNumContainers() - containerCount);
-    if(remoteRequest.getNumContainers() < 0) {
+    resourceRequestInfo.remoteRequest.setNumContainers(
+        resourceRequestInfo.remoteRequest.getNumContainers() - containerCount);
+
+    if(req instanceof StoredContainerRequest) {
+      resourceRequestInfo.containerRequests.remove(req);
+    }
+    
+    if(resourceRequestInfo.remoteRequest.getNumContainers() < 0) {
       // guard against spurious removals
-      remoteRequest.setNumContainers(0);
+      resourceRequestInfo.remoteRequest.setNumContainers(0);
     }
     // send the ResourceRequest to RM even if is 0 because it needs to override
     // a previously sent value. If ResourceRequest was not sent previously then
     // sending 0 aught to be a no-op on RM
-    addResourceRequestToAsk(remoteRequest);
+    addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
 
     // delete entries from map if no longer needed
-    if (remoteRequest.getNumContainers() == 0) {
+    if (resourceRequestInfo.remoteRequest.getNumContainers() == 0) {
       reqMap.remove(capability);
       if (reqMap.size() == 0) {
         remoteRequests.remove(resourceName);
@@ -385,7 +508,8 @@ public class AMRMClientImpl extends Abst
       LOG.info("AFTER decResourceRequest:" + " applicationId="
           + appAttemptId + " priority=" + priority.getPriority()
           + " resourceName=" + resourceName + " numContainers="
-          + remoteRequest.getNumContainers() + " #asks=" + ask.size());
+          + resourceRequestInfo.remoteRequest.getNumContainers() 
+          + " #asks=" + ask.size());
     }
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sat Jun  1 08:31:50 2013
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -50,27 +51,38 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 public class TestAMRMClient {
-  Configuration conf = null;
-  MiniYARNCluster yarnCluster = null;
-  YarnClientImpl yarnClient = null;
-  List<NodeReport> nodeReports = null;
-  ApplicationAttemptId attemptId = null;
-  int nodeCount = 3;
+  static Configuration conf = null;
+  static MiniYARNCluster yarnCluster = null;
+  static YarnClientImpl yarnClient = null;
+  static List<NodeReport> nodeReports = null;
+  static ApplicationAttemptId attemptId = null;
+  static int nodeCount = 3;
   
-  @Before
-  public void setup() throws YarnRemoteException, IOException {
+  static Resource capability;
+  static Priority priority;
+  static String node;
+  static String rack;
+  static String[] nodes;
+  static String[] racks;
+  
+  @BeforeClass
+  public static void setup() throws Exception {
     // start minicluster
     conf = new YarnConfiguration();
     yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
@@ -84,7 +96,17 @@ public class TestAMRMClient {
 
     // get node info
     nodeReports = yarnClient.getNodeReports();
-
+    
+    priority = BuilderUtils.newPriority(1);
+    capability = BuilderUtils.newResource(1024, 1);
+    node = nodeReports.get(0).getNodeId().getHost();
+    rack = nodeReports.get(0).getRackName();
+    nodes = new String[]{ node };
+    racks = new String[]{ rack };
+  }
+  
+  @Before
+  public void startApp() throws Exception {
     // submit new app
     GetNewApplicationResponse newApp = yarnClient.getNewApplication();
     ApplicationId appId = newApp.getApplicationId();
@@ -125,7 +147,12 @@ public class TestAMRMClient {
   }
   
   @After
-  public void tearDown() {
+  public void cancelApp() {
+    attemptId = null;
+  }
+  
+  @AfterClass
+  public static void tearDown() {
     if (yarnClient != null && yarnClient.getServiceState() == STATE.STARTED) {
       yarnClient.stop();
     }
@@ -133,13 +160,235 @@ public class TestAMRMClient {
       yarnCluster.stop();
     }
   }
+  
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFit() throws YarnRemoteException, IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Resource capability1 = BuilderUtils.newResource(1024, 2);
+      Resource capability2 = BuilderUtils.newResource(1024, 1);
+      Resource capability3 = BuilderUtils.newResource(1000, 2);
+      Resource capability4 = BuilderUtils.newResource(2000, 1);
+      Resource capability5 = BuilderUtils.newResource(1000, 3);
+      Resource capability6 = BuilderUtils.newResource(2000, 1);
+      
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability1, nodes, racks, priority);
+      StoredContainerRequest storedContainer2 = 
+          new StoredContainerRequest(capability2, nodes, racks, priority);
+      StoredContainerRequest storedContainer3 = 
+          new StoredContainerRequest(capability3, nodes, racks, priority);
+      StoredContainerRequest storedContainer4 = 
+          new StoredContainerRequest(capability4, nodes, racks, priority);
+      StoredContainerRequest storedContainer5 = 
+          new StoredContainerRequest(capability5, nodes, racks, priority);
+      StoredContainerRequest storedContainer6 = 
+          new StoredContainerRequest(capability6, nodes, racks, priority);
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer2);
+      amClient.addContainerRequest(storedContainer3);
+      amClient.addContainerRequest(storedContainer4);
+      amClient.addContainerRequest(storedContainer5);
+      amClient.addContainerRequest(storedContainer6);
+      
+      // test matching of containers
+      List<? extends Collection<StoredContainerRequest>> matches;
+      StoredContainerRequest storedRequest;
+      // exact match
+      Resource testCapability1 = BuilderUtils.newResource(1024,  2);
+      matches = amClient.getMatchingRequests(priority, node, testCapability1);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      amClient.removeContainerRequest(storedContainer1);
+      
+      // exact matching with order maintained
+      Resource testCapability2 = BuilderUtils.newResource(2000, 1);
+      matches = amClient.getMatchingRequests(priority, node, testCapability2);
+      verifyMatches(matches, 2);
+      // must be returned in the order they were made
+      int i = 0;
+      for(StoredContainerRequest storedRequest1 : matches.get(0)) {
+        if(i++ == 0) {
+          assertTrue(storedContainer4 == storedRequest1);
+        } else {
+          assertTrue(storedContainer6 == storedRequest1);
+        }
+      }
+      amClient.removeContainerRequest(storedContainer6);
+      
+      // matching with larger container. all requests returned
+      Resource testCapability3 = BuilderUtils.newResource(4000, 4);
+      matches = amClient.getMatchingRequests(priority, node, testCapability3);
+      assert(matches.size() == 4);
+      
+      Resource testCapability4 = BuilderUtils.newResource(1024, 2);
+      matches = amClient.getMatchingRequests(priority, node, testCapability4);
+      assert(matches.size() == 2);
+      // verify non-fitting containers are not returned and fitting ones are
+      for(Collection<StoredContainerRequest> testSet : matches) {
+        assertTrue(testSet.size() == 1);
+        StoredContainerRequest testRequest = testSet.iterator().next();
+        assertTrue(testRequest != storedContainer4);
+        assertTrue(testRequest != storedContainer5);
+        assert(testRequest == storedContainer2 || 
+                testRequest == storedContainer3);
+      }
+      
+      Resource testCapability5 = BuilderUtils.newResource(512, 4);
+      matches = amClient.getMatchingRequests(priority, node, testCapability5);
+      assert(matches.size() == 0);
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+  
+  private void verifyMatches(
+                  List<? extends Collection<StoredContainerRequest>> matches,
+                  int matchSize) {
+    assertTrue(matches.size() == 1);
+    assertTrue(matches.get(0).size() == matchSize);    
+  }
+
+  @Test (timeout=60000)
+  public void testAMRMClientMatchStorage() throws YarnRemoteException, IOException {
+    AMRMClientImpl<StoredContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = new AMRMClientImpl<StoredContainerRequest>(attemptId);
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+      
+      Priority priority1 = Records.newRecord(Priority.class);
+      priority1.setPriority(2);
+      
+      StoredContainerRequest storedContainer1 = 
+          new StoredContainerRequest(capability, nodes, racks, priority);
+      StoredContainerRequest storedContainer2 = 
+          new StoredContainerRequest(capability, nodes, racks, priority);
+      StoredContainerRequest storedContainer3 = 
+          new StoredContainerRequest(capability, null, null, priority1);
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer2);
+      amClient.addContainerRequest(storedContainer3);
+      
+      // test addition and storage
+      int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
+       .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+      assertTrue(containersRequestedAny == 2);
+      containersRequestedAny = amClient.remoteRequestsTable.get(priority1)
+          .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
+         assertTrue(containersRequestedAny == 1);
+      List<? extends Collection<StoredContainerRequest>> matches = 
+          amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 2);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 2);
+      matches = 
+          amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+      verifyMatches(matches, 2);
+      matches = amClient.getMatchingRequests(priority1, rack, capability);
+      assertTrue(matches.isEmpty());
+      matches = 
+          amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+      verifyMatches(matches, 1);
+      
+      // test removal
+      amClient.removeContainerRequest(storedContainer3);
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 2);
+      amClient.removeContainerRequest(storedContainer2);
+      matches = amClient.getMatchingRequests(priority, node, capability);
+      verifyMatches(matches, 1);
+      matches = amClient.getMatchingRequests(priority, rack, capability);
+      verifyMatches(matches, 1);
+      
+      // test matching of containers
+      StoredContainerRequest storedRequest = matches.get(0).iterator().next();
+      assertTrue(storedContainer1 == storedRequest);
+      amClient.removeContainerRequest(storedContainer1);
+      matches = 
+          amClient.getMatchingRequests(priority, ResourceRequest.ANY, capability);
+      assertTrue(matches.isEmpty());
+      matches = 
+          amClient.getMatchingRequests(priority1, ResourceRequest.ANY, capability);
+      assertTrue(matches.isEmpty());
+      // 0 requests left. everything got cleaned up
+      assertTrue(amClient.remoteRequestsTable.isEmpty());
+      
+      // go through an exemplary allocation, matching and release cycle
+      amClient.addContainerRequest(storedContainer1);
+      amClient.addContainerRequest(storedContainer3);
+      // RM should allocate container within 2 calls to allocate()
+      int allocatedContainerCount = 0;
+      int iterationsLeft = 2;
+      while (allocatedContainerCount < 2
+          && iterationsLeft-- > 0) {
+        AllocateResponse allocResponse = amClient.allocate(0.1f);
+        assertTrue(amClient.ask.size() == 0);
+        assertTrue(amClient.release.size() == 0);
+        
+        assertTrue(nodeCount == amClient.getClusterNodeCount());
+        allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+        for(Container container : allocResponse.getAllocatedContainers()) {
+          ContainerRequest expectedRequest = 
+              container.getPriority().equals(storedContainer1.getPriority()) ?
+                  storedContainer1 : storedContainer3;
+          matches = amClient.getMatchingRequests(container.getPriority(), 
+                                                 ResourceRequest.ANY, 
+                                                 container.getResource());
+          // test correct matched container is returned
+          verifyMatches(matches, 1);
+          ContainerRequest matchedRequest = matches.get(0).iterator().next();
+          assertTrue(matchedRequest == expectedRequest);
+          
+          // assign this container, use it and release it
+          amClient.releaseAssignedContainer(container.getId());
+        }
+        if(allocatedContainerCount < containersRequestedAny) {
+          // sleep to let NM's heartbeat to RM and trigger allocations
+          sleep(1000);
+        }
+      }
+      
+      assertTrue(allocatedContainerCount == 2);
+      assertTrue(amClient.release.size() == 2);
+      assertTrue(amClient.ask.size() == 0);
+      AllocateResponse allocResponse = amClient.allocate(0.1f);
+      assertTrue(amClient.release.size() == 0);
+      assertTrue(amClient.ask.size() == 0);
+      assertTrue(allocResponse.getAllocatedContainers().size() == 0);
+      
+      
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
 
   @Test (timeout=60000)
   public void testAMRMClient() throws YarnRemoteException, IOException {
-    AMRMClientImpl amClient = null;
+    AMRMClientImpl<ContainerRequest> amClient = null;
     try {
       // start am rm client
-      amClient = new AMRMClientImpl(attemptId);
+      amClient = new AMRMClientImpl<ContainerRequest>(attemptId);
       amClient.init(conf);
       amClient.start();
 
@@ -156,36 +405,27 @@ public class TestAMRMClient {
       }
     }
   }
-  
-  
-  private void testAllocation(final AMRMClientImpl amClient)  
+    
+  private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)  
       throws YarnRemoteException, IOException {
     // setup container request
-    final Resource capability = Records.newRecord(Resource.class);
-    final Priority priority = Records.newRecord(Priority.class);
-    priority.setPriority(0);
-    capability.setMemory(1024);
-    String node = nodeReports.get(0).getNodeId().getHost();
-    String rack = nodeReports.get(0).getRackName();
-    final String[] nodes = { node };
-    final String[] racks = { rack };
     
     assertTrue(amClient.ask.size() == 0);
     assertTrue(amClient.release.size() == 0);
     
-    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
-        racks, priority, 1));
-    amClient.addContainerRequest(new ContainerRequest(capability, nodes,
-        racks, priority, 3));
-    amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
-        racks, priority, 2));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 1));
+    amClient.addContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 3));
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
     
     int containersRequestedNode = amClient.remoteRequestsTable.get(priority)
-        .get(node).get(capability).getNumContainers();
+        .get(node).get(capability).remoteRequest.getNumContainers();
     int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
-        .get(rack).get(capability).getNumContainers();
+        .get(rack).get(capability).remoteRequest.getNumContainers();
     int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
-        .get(ResourceRequest.ANY).get(capability).getNumContainers();
+    .get(ResourceRequest.ANY).get(capability).remoteRequest.getNumContainers();
 
     assertTrue(containersRequestedNode == 2);
     assertTrue(containersRequestedRack == 2);
@@ -221,8 +461,8 @@ public class TestAMRMClient {
     assertTrue(amClient.ask.size() == 0);
     
     // need to tell the AMRMClient that we dont need these resources anymore
-    amClient.removeContainerRequest(new ContainerRequest(capability, nodes,
-        racks, priority, 2));
+    amClient.removeContainerRequest(
+        new ContainerRequest(capability, nodes, racks, priority, 2));
     assertTrue(amClient.ask.size() == 3);
     // send 0 container count request for resources that are no longer needed
     ResourceRequest snoopRequest = amClient.ask.iterator().next();
@@ -241,8 +481,9 @@ public class TestAMRMClient {
           new Answer<AllocateResponse>() {
             public AllocateResponse answer(InvocationOnMock invocation)
                 throws Exception {
-              amClient.removeContainerRequest(new ContainerRequest(capability,
-                  nodes, racks, priority, 2));
+              amClient.removeContainerRequest(
+                             new ContainerRequest(capability, nodes, 
+                                                          racks, priority, 2));
               throw new Exception();
             }
           });

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java Sat Jun  1 08:31:50 2013
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -48,9 +52,11 @@ public class TestAMRMClientAsync {
 
   private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
   
+  @SuppressWarnings("unchecked")
   @Test(timeout=10000)
   public void testAMRMClientAsync() throws Exception {
     Configuration conf = new Configuration();
+    final AtomicBoolean heartbeatBlock = new AtomicBoolean(true);
     List<ContainerStatus> completed1 = Arrays.asList(
         BuilderUtils.newContainerStatus(
             BuilderUtils.newContainerId(0, 0, 0, 0),
@@ -65,20 +71,38 @@ public class TestAMRMClientAsync {
         new ArrayList<ContainerStatus>(), new ArrayList<Container>());
 
     TestCallbackHandler callbackHandler = new TestCallbackHandler();
-    AMRMClient client = mock(AMRMClient.class);
-    final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
+    final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+    final AtomicInteger secondHeartbeatSync = new AtomicInteger(0);
     when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
       @Override
       public AllocateResponse answer(InvocationOnMock invocation)
           throws Throwable {
-        secondHeartbeatReceived.set(true);
+        secondHeartbeatSync.incrementAndGet();
+        while(heartbeatBlock.get()) {
+          synchronized(heartbeatBlock) {
+            heartbeatBlock.wait();
+          }
+        }
+        secondHeartbeatSync.incrementAndGet();
         return response2;
       }
     }).thenReturn(emptyResponse);
     when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
       .thenReturn(null);
+    when(client.getClusterAvailableResources()).thenAnswer(new Answer<Resource>() {
+      @Override
+      public Resource answer(InvocationOnMock invocation)
+          throws Throwable {
+        // take client lock to simulate behavior of real impl
+        synchronized (client) { 
+          Thread.sleep(10);
+        }
+        return null;
+      }
+    });
     
-    AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
+    AMRMClientAsync<ContainerRequest> asyncClient = 
+        new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
     asyncClient.init(conf);
     asyncClient.start();
     asyncClient.registerApplicationMaster("localhost", 1234, null);
@@ -86,10 +110,21 @@ public class TestAMRMClientAsync {
     // while the CallbackHandler will still only be processing the first response,
     // heartbeater thread should still be sending heartbeats.
     // To test this, wait for the second heartbeat to be received. 
-    while (!secondHeartbeatReceived.get()) {
+    while (secondHeartbeatSync.get() < 1) {
       Thread.sleep(10);
     }
     
+    // heartbeat will be blocked. make sure we can call client methods at this
+    // time. Checks that heartbeat is not holding onto client lock
+    assert(secondHeartbeatSync.get() < 2);
+    asyncClient.getClusterAvailableResources();
+    // method returned. now unblock heartbeat
+    assert(secondHeartbeatSync.get() < 2);
+    synchronized (heartbeatBlock) {
+      heartbeatBlock.set(false);
+      heartbeatBlock.notifyAll();
+    }
+    
     // allocated containers should come before completed containers
     Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
     
@@ -110,6 +145,73 @@ public class TestAMRMClientAsync {
     Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
   }
   
+  @Test(timeout=10000)
+  public void testAMRMClientAsyncException() throws Exception {
+    Configuration conf = new Configuration();
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+    String exStr = "TestException";
+    YarnRemoteException mockException = mock(YarnRemoteException.class);
+    when(mockException.getMessage()).thenReturn(exStr);
+    when(client.allocate(anyFloat())).thenThrow(mockException);
+
+    AMRMClientAsync<ContainerRequest> asyncClient = 
+        new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+    
+    synchronized (callbackHandler.notifier) {
+      asyncClient.registerApplicationMaster("localhost", 1234, null);
+      while(callbackHandler.savedException == null) {
+        try {
+          callbackHandler.notifier.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    
+    Assert.assertTrue(callbackHandler.savedException.getMessage().contains(exStr));
+    
+    asyncClient.stop();
+    // stopping should have joined all threads and completed all callbacks
+    Assert.assertTrue(callbackHandler.callbackCount == 0);
+  }
+  
+  @Test(timeout=10000)
+  public void testAMRMClientAsyncReboot() throws Exception {
+    Configuration conf = new Configuration();
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    @SuppressWarnings("unchecked")
+    AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
+    
+    final AllocateResponse rebootResponse = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+    rebootResponse.setReboot(true);
+    when(client.allocate(anyFloat())).thenReturn(rebootResponse);
+    
+    AMRMClientAsync<ContainerRequest> asyncClient = 
+        new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+    
+    synchronized (callbackHandler.notifier) {
+      asyncClient.registerApplicationMaster("localhost", 1234, null);
+      while(callbackHandler.reboot == false) {
+        try {
+          callbackHandler.notifier.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    
+    asyncClient.stop();
+    // stopping should have joined all threads and completed all callbacks
+    Assert.assertTrue(callbackHandler.callbackCount == 0);
+  }
+  
   private AllocateResponse createAllocateResponse(
       List<ContainerStatus> completed, List<Container> allocated) {
     AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
@@ -120,6 +222,11 @@ public class TestAMRMClientAsync {
   private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
     private volatile List<ContainerStatus> completedContainers;
     private volatile List<Container> allocatedContainers;
+    Exception savedException = null;
+    boolean reboot = false;
+    Object notifier = new Object();
+    
+    int callbackCount = 0;
     
     public List<ContainerStatus> takeCompletedContainers() {
       List<ContainerStatus> ret = completedContainers;
@@ -176,9 +283,28 @@ public class TestAMRMClientAsync {
     }
 
     @Override
-    public void onRebootRequest() {}
+    public void onRebootRequest() {
+      reboot = true;
+      synchronized (notifier) {
+        notifier.notifyAll();        
+      }
+    }
 
     @Override
     public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+
+    @Override
+    public float getProgress() {
+      callbackCount++;
+      return 0.5f;
+    }
+
+    @Override
+    public void onError(Exception e) {
+      savedException = e;
+      synchronized (notifier) {
+        notifier.notifyAll();        
+      }
+    }
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java?rev=1488487&r1=1488486&r2=1488487&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java Sat Jun  1 08:31:50 2013
@@ -64,7 +64,7 @@ public class TestNMClient {
   Configuration conf = null;
   MiniYARNCluster yarnCluster = null;
   YarnClientImpl yarnClient = null;
-  AMRMClientImpl rmClient = null;
+  AMRMClientImpl<ContainerRequest> rmClient = null;
   NMClientImpl nmClient = null;
   List<NodeReport> nodeReports = null;
   ApplicationAttemptId attemptId = null;
@@ -136,7 +136,7 @@ public class TestNMClient {
     }
 
     // start am rm client
-    rmClient = new AMRMClientImpl(attemptId);
+    rmClient = new AMRMClientImpl<ContainerRequest>(attemptId);
     rmClient.init(conf);
     rmClient.start();
     assertNotNull(rmClient);
@@ -185,7 +185,8 @@ public class TestNMClient {
         null, null);
   }
 
-  private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
+  private Set<Container> allocateContainers(
+      AMRMClientImpl<ContainerRequest> rmClient, int num)
       throws YarnRemoteException, IOException {
     // setup container request
     Resource capability = Resource.newInstance(1024, 0);
@@ -201,7 +202,8 @@ public class TestNMClient {
     }
 
     int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
-        .get(ResourceRequest.ANY).get(capability).getNumContainers();
+        .get(ResourceRequest.ANY).get(capability).remoteRequest
+        .getNumContainers();
 
     // RM should allocate container within 2 calls to allocate()
     int allocatedContainerCount = 0;