You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/18 06:02:48 UTC

svn commit: r1494017 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yarn-appl...

Author: vinodkv
Date: Tue Jun 18 04:02:47 2013
New Revision: 1494017

URL: http://svn.apache.org/r1494017
Log:
YARN-834. Fixed annotations for yarn-client module, reorganized packages and clearly differentiated *Async apis. Contributed by Arun C Murthy and Zhijie Shen.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun 18 04:02:47 2013
@@ -186,6 +186,10 @@ Release 2.1.0-beta - UNRELEASED
     YARN-610. ClientToken is no longer set in the environment of the Containers.
     (Omkar Vinit Joshi via vinodkv)
 
+    YARN-834. Fixed annotations for yarn-client module, reorganized packages and
+    clearly differentiated *Async apis. (Arun C Murthy and Zhijie Shen via
+    vinodkv)
+
   NEW FEATURES
 
     YARN-482. FS: Extend SchedulingMode to intermediate queues. 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jun 18 04:02:47 2013
@@ -67,9 +67,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -436,17 +436,18 @@ public class ApplicationMaster {
    * @throws YarnException
    * @throws IOException
    */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "unchecked" })
   public boolean run() throws YarnException, IOException {
     LOG.info("Starting ApplicationMaster");
 
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
-    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
+    resourceManager = 
+        AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
     containerListener = new NMCallbackHandler();
-    nmClientAsync = new NMClientAsync(containerListener);
+    nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
     nmClientAsync.init(conf);
     nmClientAsync.start();
 
@@ -682,7 +683,7 @@ public class ApplicationMaster {
       }
       Container container = containers.get(containerId);
       if (container != null) {
-        nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
+        nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
             container.getContainerToken());
       }
     }
@@ -802,7 +803,7 @@ public class ApplicationMaster {
       ctx.setCommands(commands);
 
       containerListener.addContainer(container.getId(), container);
-      nmClientAsync.startContainer(container, ctx);
+      nmClientAsync.startContainerAsync(container, ctx);
     }
   }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Tue Jun 18 04:02:47 2013
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Tue Jun 18 04:02:47 2013
@@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,262 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.collect.ImmutableList;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
+    AbstractService {
+
+  /**
+   * Create a new instance of AMRMClient.
+   * For usage:
+   * <pre>
+   * {@code
+   * AMRMClient.<T>createAMRMClientContainerRequest(appAttemptId)
+   * }</pre>
+   * @param appAttemptId the appAttemptId associated with the AMRMClient
+   * @return the newly create AMRMClient instance.
+   */
+  @Public
+  public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient(
+      ApplicationAttemptId appAttemptId) {
+    AMRMClient<T> client = new AMRMClientImpl<T>(appAttemptId);
+    return client;
+  }
+
+  @Private
+  protected AMRMClient(String name) {
+    super(name);
+  }
+
+  /**
+   * Object to represent container request for resources. Scheduler
+   * documentation should be consulted for the specifics of how the parameters
+   * are honored.
+   * All getters return immutable values.
+   * 
+   * @param capability
+   *    The {@link Resource} to be requested for each container.
+   * @param nodes
+   *    Any hosts to request that the containers are placed on.
+   * @param racks
+   *    Any racks to request that the containers are placed on. The racks
+   *    corresponding to any hosts requested will be automatically added to
+   *    this list.
+   * @param priority
+   *    The priority at which to request the containers. Higher priorities have
+   *    lower numerical values.
+   * @param containerCount
+   *    The number of containers to request.
+   */
+  public static class ContainerRequest {
+    final Resource capability;
+    final List<String> nodes;
+    final List<String> racks;
+    final Priority priority;
+    final int containerCount;
+        
+    public ContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority, int containerCount) {
+      this.capability = capability;
+      this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
+      this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
+      this.priority = priority;
+      this.containerCount = containerCount;
+    }
+    
+    public Resource getCapability() {
+      return capability;
+    }
+    
+    public List<String> getNodes() {
+      return nodes;
+    }
+    
+    public List<String> getRacks() {
+      return racks;
+    }
+    
+    public Priority getPriority() {
+      return priority;
+    }
+    
+    public int getContainerCount() {
+      return containerCount;
+    }
+    
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      sb.append("ContainerCount[").append(containerCount).append("]");
+      return sb.toString();
+    }
+  }
+ 
+  /**
+   * This creates a <code>ContainerRequest</code> for 1 container and the
+   * AMRMClient stores this request internally. <code>getMatchingRequests</code>
+   * can be used to retrieve these requests from AMRMClient. These requests may 
+   * be matched with an allocated container to determine which request to assign
+   * the container to. <code>removeContainerRequest</code> must be called using 
+   * the same assigned <code>StoredContainerRequest</code> object so that 
+   * AMRMClient can remove it from its internal store.
+   */
+  public static class StoredContainerRequest extends ContainerRequest {    
+    public StoredContainerRequest(Resource capability, String[] nodes,
+        String[] racks, Priority priority) {
+      super(capability, nodes, racks, priority, 1);
+    }
+  }
+  
+  /**
+   * Register the application master. This must be called before any 
+   * other interaction
+   * @param appHostName Name of the host on which master is running
+   * @param appHostPort Port master is listening on
+   * @param appTrackingUrl URL at which the master info can be seen
+   * @return <code>RegisterApplicationMasterResponse</code>
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract RegisterApplicationMasterResponse 
+               registerApplicationMaster(String appHostName,
+                                         int appHostPort,
+                                         String appTrackingUrl) 
+               throws YarnException, IOException;
+  
+  /**
+   * Request additional containers and receive new container allocations.
+   * Requests made via <code>addContainerRequest</code> are sent to the 
+   * <code>ResourceManager</code>. New containers assigned to the master are 
+   * retrieved. Status of completed containers and node health updates are 
+   * also retrieved.
+   * This also doubles up as a heartbeat to the ResourceManager and must be 
+   * made periodically.
+   * The call may not always return any new allocations of containers.
+   * App should not make concurrent allocate requests. May cause request loss.
+   * @param progressIndicator Indicates progress made by the master
+   * @return the response of the allocate request
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract AllocateResponse allocate(float progressIndicator) 
+                           throws YarnException, IOException;
+  
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+                                           String appMessage,
+                                           String appTrackingUrl) 
+               throws YarnException, IOException;
+  
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public abstract void addContainerRequest(T req);
+  
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public abstract void removeContainerRequest(T req);
+  
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public abstract void releaseAssignedContainer(ContainerId containerId);
+  
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public abstract Resource getClusterAvailableResources();
+  
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public abstract int getClusterNodeCount();
+
+  /**
+   * Get outstanding <code>StoredContainerRequest</code>s matching the given 
+   * parameters. These StoredContainerRequests should have been added via
+   * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+   * the AMRMClient may return its internal collection directly without creating 
+   * a copy. Users should not perform mutable operations on the return value.
+   * Each collection in the list contains requests with identical 
+   * <code>Resource</code> size that fit in the given capability. In a 
+   * collection, requests will be returned in the same order as they were added.
+   * @return Collection of request matching the parameters
+   */
+  public abstract List<? extends Collection<T>> getMatchingRequests(
+                                           Priority priority, 
+                                           String resourceName, 
+                                           Resource capability);
+
+  /**
+   * It returns the NMToken received on allocate call. It will not communicate
+   * with RM to get NMTokens. On allocate call whenever we receive new token
+   * along with container AMRMClient will cache this NMToken per node manager.
+   * This map returned should be shared with any application which is
+   * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
+   * NMToken is received for the same node manager then it will be replaced. 
+   */
+  public abstract ConcurrentMap<String, Token> getNMTokens();
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class NMClient extends AbstractService {
+
+  /**
+   * Create a new instance of NMClient.
+   */
+  @Public
+  public static NMClient createNMClient() {
+    NMClient client = new NMClientImpl();
+    return client;
+  }
+
+  /**
+   * Create a new instance of NMClient.
+   */
+  @Public
+  public static NMClient createNMClient(String name) {
+    NMClient client = new NMClientImpl(name);
+    return client;
+  }
+
+  @Private
+  protected NMClient(String name) {
+    super(name);
+  }
+
+  /**
+   * <p>Start an allocated container.</p>
+   *
+   * <p>The <code>ApplicationMaster</code> or other applications that use the
+   * client must provide the details of the allocated container, including the
+   * Id, the assigned node's Id and the token via {@link Container}. In
+   * addition, the AM needs to provide the {@link ContainerLaunchContext} as
+   * well.</p>
+   *
+   * @param container the allocated container
+   * @param containerLaunchContext the context information needed by the
+   *                               <code>NodeManager</code> to launch the
+   *                               container
+   * @return a map between the auxiliary service names and their outputs
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract Map<String, ByteBuffer> startContainer(Container container,
+      ContainerLaunchContext containerLaunchContext)
+          throws YarnException, IOException;
+
+  /**
+   * <p>Stop an started container.</p>
+   *
+   * @param containerId the Id of the started container
+   * @param nodeId the Id of the <code>NodeManager</code>
+   * @param containerToken the security token to verify authenticity of the
+   *                       started container
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract void stopContainer(ContainerId containerId, NodeId nodeId,
+      Token containerToken) throws YarnException, IOException;
+
+  /**
+   * <p>Query the status of a container.</p>
+   *
+   * @param containerId the Id of the started container
+   * @param nodeId the Id of the <code>NodeManager</code>
+   * @param containerToken the security token to verify authenticity of the
+   *                       started container
+   * @return the status of a container
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
+      Token containerToken) throws YarnException, IOException;
+
+  /**
+   * <p>Set whether the containers that are started by this client, and are
+   * still running should be stopped when the client stops. By default, the
+   * feature should be enabled.</p>
+   *
+   * @param enabled whether the feature is enabled or not
+   */
+  public abstract void cleanupRunningContainersOnStop(boolean enabled);
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,295 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class YarnClient extends AbstractService {
+
+  /**
+   * Create a new instance of YarnClient.
+   */
+  @Public
+  public static YarnClient createYarnClient() {
+    YarnClient client = new YarnClientImpl();
+    return client;
+  }
+
+  /**
+   * Create a new instance of YarnClient.
+   */
+  @Public
+  public static YarnClient createYarnClient(InetSocketAddress rmAddress) {
+    YarnClient client = new YarnClientImpl(rmAddress);
+    return client;
+  }
+
+  /**
+   * Create a new instance of YarnClient.
+   */
+  @Public
+  public static YarnClient createYarnClient(String name,
+      InetSocketAddress rmAddress) {
+    YarnClient client = new YarnClientImpl(name, rmAddress);
+    return client;
+  }
+
+  @Private
+  protected YarnClient(String name) {
+    super(name);
+  }
+
+  /**
+   * <p>
+   * Obtain a new {@link ApplicationId} for submitting new applications.
+   * </p>
+   * 
+   * <p>
+   * Returns a response which contains {@link ApplicationId} that can be used to
+   * submit a new application. See
+   * {@link #submitApplication(ApplicationSubmissionContext)}.
+   * </p>
+   * 
+   * <p>
+   * See {@link GetNewApplicationResponse} for other information that is
+   * returned.
+   * </p>
+   * 
+   * @return response containing the new <code>ApplicationId</code> to be used
+   *         to submit an application
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract GetNewApplicationResponse getNewApplication() throws YarnException,
+      IOException;
+
+  /**
+   * <p>
+   * Submit a new application to <code>YARN.</code> It is a blocking call, such
+   * that it will not return {@link ApplicationId} until the submitted
+   * application has been submitted and accepted by the ResourceManager.
+   * </p>
+   * 
+   * @param appContext
+   *          {@link ApplicationSubmissionContext} containing all the details
+   *          needed to submit a new application
+   * @return {@link ApplicationId} of the accepted application
+   * @throws YarnException
+   * @throws IOException
+   * @see #getNewApplication()
+   */
+  public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext)
+      throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Kill an application identified by given ID.
+   * </p>
+   * 
+   * @param applicationId
+   *          {@link ApplicationId} of the application that needs to be killed
+   * @throws YarnException
+   *           in case of errors or if YARN rejects the request due to
+   *           access-control restrictions.
+   * @throws IOException
+   * @see #getQueueAclsInfo()
+   */
+  public abstract void killApplication(ApplicationId applicationId) throws YarnException,
+      IOException;
+
+  /**
+   * <p>
+   * Get a report of the given Application.
+   * </p>
+   * 
+   * <p>
+   * In secure mode, <code>YARN</code> verifies access to the application, queue
+   * etc. before accepting the request.
+   * </p>
+   * 
+   * <p>
+   * If the user does not have <code>VIEW_APP</code> access then the following
+   * fields in the report will be set to stubbed values:
+   * <ul>
+   * <li>host - set to "N/A"</li>
+   * <li>RPC port - set to -1</li>
+   * <li>client token - set to "N/A"</li>
+   * <li>diagnostics - set to "N/A"</li>
+   * <li>tracking URL - set to "N/A"</li>
+   * <li>original tracking URL - set to "N/A"</li>
+   * <li>resource usage report - all values are -1</li>
+   * </ul>
+   * </p>
+   * 
+   * @param appId
+   *          {@link ApplicationId} of the application that needs a report
+   * @return application report
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract ApplicationReport getApplicationReport(ApplicationId appId)
+      throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Get a report (ApplicationReport) of all Applications in the cluster.
+   * </p>
+   * 
+   * <p>
+   * If the user does not have <code>VIEW_APP</code> access for an application
+   * then the corresponding report will be filtered as described in
+   * {@link #getApplicationReport(ApplicationId)}.
+   * </p>
+   * 
+   * @return a list of reports of all running applications
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract List<ApplicationReport> getApplicationList() throws YarnException,
+      IOException;
+
+  /**
+   * <p>
+   * Get metrics ({@link YarnClusterMetrics}) about the cluster.
+   * </p>
+   * 
+   * @return cluster metrics
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
+      IOException;
+
+  /**
+   * <p>
+   * Get a report of all nodes ({@link NodeReport}) in the cluster.
+   * </p>
+   * 
+   * @return A list of report of all nodes
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract List<NodeReport> getNodeReports() throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Get a delegation token so as to be able to talk to YARN using those tokens.
+   * 
+   * @param renewer
+   *          Address of the renewer who can renew these tokens when needed by
+   *          securely talking to YARN.
+   * @return a delegation token ({@link Token}) that can be used to
+   *         talk to YARN
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract Token getRMDelegationToken(Text renewer)
+      throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Get information ({@link QueueInfo}) about a given <em>queue</em>.
+   * </p>
+   * 
+   * @param queueName
+   *          Name of the queue whose information is needed
+   * @return queue information
+   * @throws YarnException
+   *           in case of errors or if YARN rejects the request due to
+   *           access-control restrictions.
+   * @throws IOException
+   */
+  public abstract QueueInfo getQueueInfo(String queueName) throws YarnException,
+      IOException;
+
+  /**
+   * <p>
+   * Get information ({@link QueueInfo}) about all queues, recursively if there
+   * is a hierarchy
+   * </p>
+   * 
+   * @return a list of queue-information for all queues
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract List<QueueInfo> getAllQueues() throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Get information ({@link QueueInfo}) about top level queues.
+   * </p>
+   * 
+   * @return a list of queue-information for all the top-level queues
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract List<QueueInfo> getRootQueueInfos() throws YarnException, IOException;
+
+  /**
+   * <p>
+   * Get information ({@link QueueInfo}) about all the immediate children queues
+   * of the given queue
+   * </p>
+   * 
+   * @param parent
+   *          Name of the queue whose child-queues' information is needed
+   * @return a list of queue-information for all queues who are direct children
+   *         of the given parent queue.
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract List<QueueInfo> getChildQueueInfos(String parent) throws YarnException,
+      IOException;
+
+  /**
+   * <p>
+   * Get information about <em>acls</em> for <em>current user</em> on all the
+   * existing queues.
+   * </p>
+   * 
+   * @return a list of queue acls ({@link QueueUserACLInfo}) for
+   *         <em>current user</em>
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
+      IOException;
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,245 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>AMRMClientAsync</code> handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions.  It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ * 
+ * It should be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ *   public void onContainersAllocated(List<Container> containers) {
+ *     [run tasks on the containers]
+ *   }
+ *   
+ *   public void onContainersCompleted(List<ContainerStatus> statuses) {
+ *     [update progress, check whether app is done]
+ *   }
+ *   
+ *   public void onNodesUpdated(List<NodeReport> updated) {}
+ *   
+ *   public void onReboot() {}
+ * }
+ * }
+ * </pre>
+ * 
+ * The client's lifecycle should be managed similarly to the following:
+ * 
+ * <pre>
+ * {@code
+ * AMRMClientAsync asyncClient = 
+ *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ *       appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Public
+@Stable
+public abstract class AMRMClientAsync<T extends ContainerRequest> 
+extends AbstractService {
+  
+  protected final AMRMClient<T> client;
+  protected final CallbackHandler handler;
+  protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
+
+  public static <T extends ContainerRequest> AMRMClientAsync<T> 
+  createAMRMClientAsync(
+      ApplicationAttemptId id, 
+      int intervalMs, 
+      CallbackHandler callbackHandler) {
+    return new AMRMClientAsyncImpl<T>(id, intervalMs, callbackHandler);
+  }
+  
+  public static <T extends ContainerRequest> AMRMClientAsync<T> 
+  createAMRMClientAsync(
+      AMRMClient<T> client, 
+      int intervalMs, 
+      CallbackHandler callbackHandler) {
+    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
+  }
+  
+  protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+      CallbackHandler callbackHandler) {
+    this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
+  }
+  
+  @Private
+  @VisibleForTesting
+  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
+      CallbackHandler callbackHandler) {
+    super(AMRMClientAsync.class.getName());
+    this.client = client;
+    this.heartbeatIntervalMs.set(intervalMs);
+    this.handler = callbackHandler;
+  }
+    
+  public void setHeartbeatInterval(int interval) {
+    heartbeatIntervalMs.set(interval);
+  }
+  
+  public abstract List<? extends Collection<T>> getMatchingRequests(
+                                                   Priority priority, 
+                                                   String resourceName, 
+                                                   Resource capability);
+  
+  /**
+   * Registers this application master with the resource manager. On successful
+   * registration, starts the heartbeating thread.
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnException, IOException;
+
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract void unregisterApplicationMaster(
+      FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
+  throws YarnException, IOException;
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public abstract void addContainerRequest(T req);
+
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public abstract void removeContainerRequest(T req);
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public abstract void releaseAssignedContainer(ContainerId containerId);
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public abstract Resource getClusterAvailableResources();
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public abstract int getClusterNodeCount();
+
+  /**
+   * It returns the NMToken received on allocate call. It will not communicate
+   * with RM to get NMTokens. On allocate call whenever we receive new token
+   * along with new container AMRMClientAsync will cache this NMToken per node
+   * manager. This map returned should be shared with any application which is
+   * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+   * NMTokens. If a new NMToken is received for the same node manager
+   * then it will be replaced. 
+   */
+  public abstract ConcurrentMap<String, Token> getNMTokens();
+  
+  public interface CallbackHandler {
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with completed
+     * containers. If the response contains both completed containers and
+     * allocated containers, this will be called before containersAllocated.
+     */
+    public void onContainersCompleted(List<ContainerStatus> statuses);
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with allocated
+     * containers. If the response containers both completed containers and
+     * allocated containers, this will be called after containersCompleted.
+     */
+    public void onContainersAllocated(List<Container> containers);
+    
+    /**
+     * Called when the ResourceManager wants the ApplicationMaster to shutdown
+     * for being out of sync etc. The ApplicationMaster should not unregister
+     * with the RM unless the ApplicationMaster wants to be the last attempt.
+     */
+    public void onShutdownRequest();
+    
+    /**
+     * Called when nodes tracked by the ResourceManager have changed in health,
+     * availability etc.
+     */
+    public void onNodesUpdated(List<NodeReport> updatedNodes);
+    
+    public float getProgress();
+    
+    public void onError(Exception e);
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>NMClientAsync</code> handles communication with all the NodeManagers
+ * and provides asynchronous updates on getting responses from them. It
+ * maintains a thread pool to communicate with individual NMs where a number of
+ * worker threads process requests to NMs by using {@link NMClientImpl}. The max
+ * size of the thread pool is configurable through
+ * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
+ *
+ * It should be used in conjunction with a CallbackHandler. For example
+ *
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ *   public void onContainerStarted(ContainerId containerId,
+ *       Map<String, ByteBuffer> allServiceResponse) {
+ *     [post process after the container is started, process the response]
+ *   }
+ *
+ *   public void onContainerStatusReceived(ContainerId containerId,
+ *       ContainerStatus containerStatus) {
+ *     [make use of the status of the container]
+ *   }
+ *
+ *   public void onContainerStopped(ContainerId containerId) {
+ *     [post process after the container is stopped]
+ *   }
+ *
+ *   public void onStartContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onGetContainerStatusError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onStopContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ * }
+ * }
+ * </pre>
+ *
+ * The client's life-cycle should be managed like the following:
+ *
+ * <pre>
+ * {@code
+ * NMClientAsync asyncClient = 
+ *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Public
+@Stable
+public abstract class NMClientAsync extends AbstractService {
+
+  protected NMClient client;
+  protected CallbackHandler callbackHandler;
+
+  public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
+    return new NMClientAsyncImpl(callbackHandler);
+  }
+  
+  protected NMClientAsync(CallbackHandler callbackHandler) {
+    this (NMClientAsync.class.getName(), callbackHandler);
+  }
+
+  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
+  }
+
+  @Private
+  @VisibleForTesting
+  protected NMClientAsync(String name, NMClient client,
+      CallbackHandler callbackHandler) {
+    super(name);
+    this.setClient(client);
+    this.setCallbackHandler(callbackHandler);
+  }
+
+  public abstract void startContainerAsync(
+      Container container, ContainerLaunchContext containerLaunchContext);
+
+  public abstract void stopContainerAsync(
+      ContainerId containerId, NodeId nodeId, Token containerToken);
+
+  public abstract void getContainerStatusAsync(
+      ContainerId containerId, NodeId nodeId, Token containerToken);
+  
+  public NMClient getClient() {
+    return client;
+  }
+
+  public void setClient(NMClient client) {
+    this.client = client;
+  }
+
+  public CallbackHandler getCallbackHandler() {
+    return callbackHandler;
+  }
+
+  public void setCallbackHandler(CallbackHandler callbackHandler) {
+    this.callbackHandler = callbackHandler;
+  }
+
+  /**
+   * <p>
+   * The callback interface needs to be implemented by {@link NMClientAsync}
+   * users. The APIs are called when responses from <code>NodeManager</code> are
+   * available.
+   * </p>
+   *
+   * <p>
+   * Once a callback happens, the users can chose to act on it in blocking or
+   * non-blocking manner. If the action on callback is done in a blocking
+   * manner, some of the threads performing requests on NodeManagers may get
+   * blocked depending on how many threads in the pool are busy.
+   * </p>
+   *
+   * <p>
+   * The implementation of the callback function should not throw the
+   * unexpected exception. Otherwise, {@link NMClientAsync} will just
+   * catch, log and then ignore it.
+   * </p>
+   */
+  public static interface CallbackHandler {
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate its
+     * acceptance of the starting container request
+     * @param containerId the Id of the container
+     * @param allServiceResponse a Map between the auxiliary service names and
+     *                           their outputs
+     */
+    void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds with the status
+     * of the container
+     * @param containerId the Id of the container
+     * @param containerStatus the status of the container
+     */
+    void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate the
+     * container is stopped.
+     * @param containerId the Id of the container
+     */
+    void onContainerStopped(ContainerId containerId);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * starting a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onStartContainerError(ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * querying the status of a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onGetContainerStatusError(ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * stopping a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onStopContainerError(ContainerId containerId, Throwable t);
+
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,342 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class AMRMClientAsyncImpl<T extends ContainerRequest> 
+extends AMRMClientAsync<T> {
+  
+  private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class);
+  
+  private final HeartbeatThread heartbeatThread;
+  private final CallbackHandlerThread handlerThread;
+
+  private final BlockingQueue<AllocateResponse> responseQueue;
+  
+  private final Object unregisterHeartbeatLock = new Object();
+  
+  private volatile boolean keepRunning;
+  private volatile float progress;
+  
+  private volatile Exception savedException;
+  
+  public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs,
+      CallbackHandler callbackHandler) {
+    this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
+  }
+  
+  @Private
+  @VisibleForTesting
+  public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
+      CallbackHandler callbackHandler) {
+    super(client, intervalMs, callbackHandler);
+    heartbeatThread = new HeartbeatThread();
+    handlerThread = new CallbackHandlerThread();
+    responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    keepRunning = true;
+    savedException = null;
+  }
+    
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    client.init(conf);
+  }  
+  
+  @Override
+  protected void serviceStart() throws Exception {
+    handlerThread.start();
+    client.start();
+    super.serviceStart();
+  }
+  
+  /**
+   * Tells the heartbeat and handler threads to stop and waits for them to
+   * terminate.  Calling this method from the callback handler thread would cause
+   * deadlock, and thus should be avoided.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    if (Thread.currentThread() == handlerThread) {
+      throw new YarnRuntimeException("Cannot call stop from callback handler thread!");
+    }
+    keepRunning = false;
+    try {
+      heartbeatThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Error joining with heartbeat thread", ex);
+    }
+    client.stop();
+    try {
+      handlerThread.interrupt();
+      handlerThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Error joining with hander thread", ex);
+    }
+    super.serviceStop();
+  }
+  
+  public void setHeartbeatInterval(int interval) {
+    heartbeatIntervalMs.set(interval);
+  }
+  
+  public List<? extends Collection<T>> getMatchingRequests(
+                                                   Priority priority, 
+                                                   String resourceName, 
+                                                   Resource capability) {
+    return client.getMatchingRequests(priority, resourceName, capability);
+  }
+  
+  /**
+   * Registers this application master with the resource manager. On successful
+   * registration, starts the heartbeating thread.
+   * @throws YarnException
+   * @throws IOException
+   */
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnException, IOException {
+    RegisterApplicationMasterResponse response = client
+        .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+    heartbeatThread.start();
+    return response;
+  }
+
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnException
+   * @throws IOException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnException,
+      IOException {
+    synchronized (unregisterHeartbeatLock) {
+      keepRunning = false;
+      client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
+    }
+  }
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public void addContainerRequest(T req) {
+    client.addContainerRequest(req);
+  }
+
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(T req) {
+    client.removeContainerRequest(req);
+  }
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId) {
+    client.releaseAssignedContainer(containerId);
+  }
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources() {
+    return client.getClusterAvailableResources();
+  }
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount() {
+    return client.getClusterNodeCount();
+  }
+
+  /**
+   * It returns the NMToken received on allocate call. It will not communicate
+   * with RM to get NMTokens. On allocate call whenever we receive new token
+   * along with new container AMRMClientAsync will cache this NMToken per node
+   * manager. This map returned should be shared with any application which is
+   * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+   * NMTokens. If a new NMToken is received for the same node manager
+   * then it will be replaced. 
+   */
+  public ConcurrentMap<String, Token> getNMTokens() {
+    return client.getNMTokens();
+  }
+  
+  private class HeartbeatThread extends Thread {
+    public HeartbeatThread() {
+      super("AMRM Heartbeater thread");
+    }
+    
+    public void run() {
+      while (true) {
+        AllocateResponse response = null;
+        // synchronization ensures we don't send heartbeats after unregistering
+        synchronized (unregisterHeartbeatLock) {
+          if (!keepRunning) {
+            break;
+          }
+            
+          try {
+            response = client.allocate(progress);
+          } catch (YarnException ex) {
+            LOG.error("Yarn exception on heartbeat", ex);
+            savedException = ex;
+            // interrupt handler thread in case it waiting on the queue
+            handlerThread.interrupt();
+            break;
+          } catch (IOException e) {
+            LOG.error("IO exception on heartbeat", e);
+            savedException = e;
+            // interrupt handler thread in case it waiting on the queue
+            handlerThread.interrupt();
+            break;
+          }
+        }
+        if (response != null) {
+          while (true) {
+            try {
+              responseQueue.put(response);
+              break;
+            } catch (InterruptedException ex) {
+              LOG.info("Interrupted while waiting to put on response queue", ex);
+            }
+          }
+        }
+        
+        try {
+          Thread.sleep(heartbeatIntervalMs.get());
+        } catch (InterruptedException ex) {
+          LOG.info("Heartbeater interrupted", ex);
+        }
+      }
+    }
+  }
+  
+  private class CallbackHandlerThread extends Thread {
+    public CallbackHandlerThread() {
+      super("AMRM Callback Handler Thread");
+    }
+    
+    public void run() {
+      while (keepRunning) {
+        AllocateResponse response;
+        try {
+          if(savedException != null) {
+            LOG.error("Stopping callback due to: ", savedException);
+            handler.onError(savedException);
+            break;
+          }
+          response = responseQueue.take();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting for queue", ex);
+          continue;
+        }
+
+        if (response.getAMCommand() != null) {
+          boolean stop = false;
+          switch(response.getAMCommand()) {
+          case AM_RESYNC:
+          case AM_SHUTDOWN:
+            handler.onShutdownRequest();
+            LOG.info("Shutdown requested. Stopping callback.");
+            stop = true;
+            break;
+          default:
+            String msg =
+                  "Unhandled value of AMCommand: " + response.getAMCommand();
+            LOG.error(msg);
+            throw new YarnRuntimeException(msg);
+          }
+          if(stop) {
+            // should probably stop heartbeating also YARN-763
+            break;
+          }
+        }
+        List<NodeReport> updatedNodes = response.getUpdatedNodes();
+        if (!updatedNodes.isEmpty()) {
+          handler.onNodesUpdated(updatedNodes);
+        }
+        
+        List<ContainerStatus> completed =
+            response.getCompletedContainersStatuses();
+        if (!completed.isEmpty()) {
+          handler.onContainersCompleted(completed);
+        }
+
+        List<Container> allocated = response.getAllocatedContainers();
+        if (!allocated.isEmpty()) {
+          handler.onContainersAllocated(allocated);
+        }
+        
+        progress = handler.getProgress();
+      }
+    }
+  }
+}