You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/03/21 23:26:06 UTC

svn commit: r1459555 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yarn-client/src...

Author: bikas
Date: Thu Mar 21 22:26:06 2013
New Revision: 1459555

URL: http://svn.apache.org/r1459555
Log:
YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks. (Sandy Ryza via bikas)

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1459555&r1=1459554&r2=1459555&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Mar 21 22:26:06 2013
@@ -88,6 +88,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via
     hitesh)
 
+    YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks.
+    (Sandy Ryza via bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1459555&r1=1459554&r2=1459555&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Thu Mar 21 22:26:06 2013
@@ -63,12 +63,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -147,8 +147,8 @@ public class ApplicationMaster {
   private YarnRPC rpc;
 
   // Handle to communicate with the Resource Manager
-  private AMRMClient resourceManager;
-
+  private AMRMClientAsync resourceManager;
+  
   // Application Attempt Id ( combination of attemptId and fail count )
   private ApplicationAttemptId appAttemptID;
 
@@ -169,8 +169,6 @@ public class ApplicationMaster {
   // Priority of the request
   private int requestPriority;
 
-  // Simple flag to denote whether all works is done
-  private boolean appDone = false;
   // Counter for completed containers ( complete denotes successful or failed )
   private AtomicInteger numCompletedContainers = new AtomicInteger();
   // Allocated container count so that we know how many containers has the RM
@@ -201,6 +199,9 @@ public class ApplicationMaster {
   // Hardcoded path to shell script in launch container's local env
   private final String ExecShellStringPath = "ExecShellScript.sh";
 
+  private volatile boolean done;
+  private volatile boolean success;
+  
   // Launch threads
   private List<Thread> launchThreads = new ArrayList<Thread>();
 
@@ -416,226 +417,202 @@ public class ApplicationMaster {
   public boolean run() throws YarnRemoteException {
     LOG.info("Starting ApplicationMaster");
 
-    // Connect to ResourceManager
-    resourceManager = new AMRMClientImpl(appAttemptID);
+    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    
+    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
-    try {
-      // Setup local RPC Server to accept status requests directly from clients
-      // TODO need to setup a protocol for client to be able to communicate to
-      // the RPC server
-      // TODO use the rpc port info to register with the RM for the client to
-      // send requests to this app master
-
-      // Register self with ResourceManager
-      RegisterApplicationMasterResponse response = resourceManager
-          .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
-              appMasterTrackingUrl);
-      // Dump out information about cluster capability as seen by the
-      // resource manager
-      int minMem = response.getMinimumResourceCapability().getMemory();
-      int maxMem = response.getMaximumResourceCapability().getMemory();
-      LOG.info("Min mem capabililty of resources in this cluster " + minMem);
-      LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
-      // A resource ask has to be atleast the minimum of the capability of the
-      // cluster, the value has to be a multiple of the min value and cannot
-      // exceed the max.
-      // If it is not an exact multiple of min, the RM will allocate to the
-      // nearest multiple of min
-      if (containerMemory < minMem) {
-        LOG.info("Container memory specified below min threshold of cluster."
-            + " Using min value." + ", specified=" + containerMemory + ", min="
-            + minMem);
-        containerMemory = minMem;
-      } else if (containerMemory > maxMem) {
-        LOG.info("Container memory specified above max threshold of cluster."
-            + " Using max value." + ", specified=" + containerMemory + ", max="
-            + maxMem);
-        containerMemory = maxMem;
-      }
-
-      // Setup heartbeat emitter
-      // TODO poll RM every now and then with an empty request to let RM know
-      // that we are alive
-      // The heartbeat interval after which an AM is timed out by the RM is
-      // defined by a config setting:
-      // RM_AM_EXPIRY_INTERVAL_MS with default defined by
-      // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
-      // The allocate calls to the RM count as heartbeats so, for now,
-      // this additional heartbeat emitter is not required.
-
-      // Setup ask for containers from RM
-      // Send request for containers to RM
-      // Until we get our fully allocated quota, we keep on polling RM for
-      // containers
-      // Keep looping until all the containers are launched and shell script
-      // executed on them ( regardless of success/failure).
-
-      int loopCounter = -1;
-
-      while (numCompletedContainers.get() < numTotalContainers && !appDone) {
-        loopCounter++;
-
-        // log current state
-        LOG.info("Current application state: loop=" + loopCounter
-            + ", appDone=" + appDone + ", total=" + numTotalContainers
-            + ", requested=" + numRequestedContainers + ", completed="
-            + numCompletedContainers + ", failed=" + numFailedContainers
-            + ", currentAllocated=" + numAllocatedContainers);
-
-        // Sleep before each loop when asking RM for containers
-        // to avoid flooding RM with spurious requests when it
-        // need not have any available containers
-        // Sleeping for 1000 ms.
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          LOG.info("Sleep interrupted " + e.getMessage());
-        }
+    // Setup local RPC Server to accept status requests directly from clients
+    // TODO need to setup a protocol for client to be able to communicate to
+    // the RPC server
+    // TODO use the rpc port info to register with the RM for the client to
+    // send requests to this app master
+
+    // Register self with ResourceManager
+    // This will start heartbeating to the RM
+    RegisterApplicationMasterResponse response = resourceManager
+        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+            appMasterTrackingUrl);
+    // Dump out information about cluster capability as seen by the
+    // resource manager
+    int minMem = response.getMinimumResourceCapability().getMemory();
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask has to be atleast the minimum of the capability of the
+    // cluster, the value has to be a multiple of the min value and cannot
+    // exceed the max.
+    // If it is not an exact multiple of min, the RM will allocate to the
+    // nearest multiple of min
+    if (containerMemory < minMem) {
+      LOG.info("Container memory specified below min threshold of cluster."
+          + " Using min value." + ", specified=" + containerMemory + ", min="
+          + minMem);
+      containerMemory = minMem;
+    } else if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster."
+          + " Using max value." + ", specified=" + containerMemory + ", max="
+          + maxMem);
+      containerMemory = maxMem;
+    }
 
-        // No. of containers to request
-        // For the first loop, askCount will be equal to total containers needed
-        // From that point on, askCount will always be 0 as current
-        // implementation does not change its ask on container failures.
-        int askCount = numTotalContainers - numRequestedContainers.get();
-        numRequestedContainers.addAndGet(askCount);
-
-        if (askCount > 0) {
-          ContainerRequest containerAsk = setupContainerAskForRM(askCount);
-          resourceManager.addContainerRequest(containerAsk);
-        }
 
-        // Send the request to RM
-        LOG.info("Asking RM for containers" + ", askCount=" + askCount);
-        AllocateResponse allocResp = sendContainerAskToRM();
-
-        // Retrieve list of allocated containers from the response
-        List<Container> allocatedContainers =
-            allocResp.getAllocatedContainers();
-        LOG.info("Got response from RM for container ask, allocatedCnt="
-            + allocatedContainers.size());
-        numAllocatedContainers.addAndGet(allocatedContainers.size());
-        for (Container allocatedContainer : allocatedContainers) {
-          LOG.info("Launching shell command on a new container."
-              + ", containerId=" + allocatedContainer.getId()
-              + ", containerNode=" + allocatedContainer.getNodeId().getHost()
-              + ":" + allocatedContainer.getNodeId().getPort()
-              + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-              + ", containerState" + allocatedContainer.getState()
-              + ", containerResourceMemory"
-              + allocatedContainer.getResource().getMemory());
-          // + ", containerToken"
-          // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
-          LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
-              allocatedContainer);
-          Thread launchThread = new Thread(runnableLaunchContainer);
-
-          // launch and start the container on a separate thread to keep
-          // the main thread unblocked
-          // as all containers may not be allocated at one go.
-          launchThreads.add(launchThread);
-          launchThread.start();
-        }
+    // Setup ask for containers from RM
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for
+    // containers
+    // Keep looping until all the containers are launched and shell script
+    // executed on them ( regardless of success/failure).
+    ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
+    resourceManager.addContainerRequest(containerAsk);
+    numRequestedContainers.set(numTotalContainers);
 
-        // Check what the current available resources in the cluster are
-        // TODO should we do anything if the available resources are not enough?
-        Resource availableResources = allocResp.getAvailableResources();
-        LOG.info("Current available resources in the cluster "
-            + availableResources);
-
-        // Check the completed containers
-        List<ContainerStatus> completedContainers = allocResp
-            .getCompletedContainersStatuses();
-        LOG.info("Got response from RM for container ask, completedCnt="
-            + completedContainers.size());
-        for (ContainerStatus containerStatus : completedContainers) {
-          LOG.info("Got container status for containerID="
-              + containerStatus.getContainerId() + ", state="
-              + containerStatus.getState() + ", exitStatus="
-              + containerStatus.getExitStatus() + ", diagnostics="
-              + containerStatus.getDiagnostics());
-
-          // non complete containers should not be here
-          assert (containerStatus.getState() == ContainerState.COMPLETE);
-
-          // increment counters for completed/failed containers
-          int exitStatus = containerStatus.getExitStatus();
-          if (0 != exitStatus) {
-            // container failed
-            if (-100 != exitStatus) {
-              // shell script failed
-              // counts as completed
-              numCompletedContainers.incrementAndGet();
-              numFailedContainers.incrementAndGet();
-            } else {
-              // something else bad happened
-              // app job did not complete for some reason
-              // we should re-try as the container was lost for some reason
-              numAllocatedContainers.decrementAndGet();
-              numRequestedContainers.decrementAndGet();
-              // we do not need to release the container as it would be done
-              // by the RM/CM.
-            }
-          } else {
-            // nothing to do
-            // container completed successfully
+    while (!done) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+    finish();
+    
+    return success;
+  }
+  
+  private void finish() {
+    // Join all launched threads
+    // needed for when we time out
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    // When the application completes, it should send a finish application
+    // signal to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinalApplicationStatus appStatus;
+    String appMessage = null;
+    success = true;
+    if (numFailedContainers.get() == 0) {
+      appStatus = FinalApplicationStatus.SUCCEEDED;
+    } else {
+      appStatus = FinalApplicationStatus.FAILED;
+      appMessage = "Diagnostics." + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get() + ", allocated="
+          + numAllocatedContainers.get() + ", failed="
+          + numFailedContainers.get();
+      success = false;
+    }
+    try {
+      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+    } catch (YarnRemoteException ex) {
+      LOG.error("Failed to unregister application", ex);
+    }
+    
+    done = true;
+    resourceManager.stop();
+  }
+  
+  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+      LOG.info("Got response from RM for container ask, completedCnt="
+          + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {
+        LOG.info("Got container status for containerID="
+            + containerStatus.getContainerId() + ", state="
+            + containerStatus.getState() + ", exitStatus="
+            + containerStatus.getExitStatus() + ", diagnostics="
+            + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here
+        assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed
+          if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
+            // shell script failed
+            // counts as completed
             numCompletedContainers.incrementAndGet();
-            LOG.info("Container completed successfully." + ", containerId="
-                + containerStatus.getContainerId());
+            numFailedContainers.incrementAndGet();
+          } else {
+            // container was killed by framework, possibly preempted
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM
           }
+        } else {
+          // nothing to do
+          // container completed successfully
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully." + ", containerId="
+              + containerStatus.getContainerId());
         }
-        if (numCompletedContainers.get() == numTotalContainers) {
-          appDone = true;
-        }
-
-        LOG.info("Current application state: loop=" + loopCounter
-            + ", appDone=" + appDone + ", total=" + numTotalContainers
-            + ", requested=" + numRequestedContainers + ", completed="
-            + numCompletedContainers + ", failed=" + numFailedContainers
-            + ", currentAllocated=" + numAllocatedContainers);
-
-        // TODO
-        // Add a timeout handling layer
-        // for misbehaving shell commands
       }
-
-      // Join all launched threads
-      // needed for when we time out
-      // and we need to release containers
-      for (Thread launchThread : launchThreads) {
-        try {
-          launchThread.join(10000);
-        } catch (InterruptedException e) {
-          LOG.info("Exception thrown in thread join: " + e.getMessage());
-          e.printStackTrace();
-        }
+      
+      // ask for more containers if any failed
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      if (askCount > 0) {
+        ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+        resourceManager.addContainerRequest(containerAsk);
+      }
+      
+      // set progress to deliver to RM on next heartbeat
+      float progress = (float) numCompletedContainers.get()
+          / numTotalContainers;
+      resourceManager.setProgress(progress);
+      
+      if (numCompletedContainers.get() == numTotalContainers) {
+        done = true;
       }
+    }
 
-      // When the application completes, it should send a finish application
-      // signal to the RM
-      LOG.info("Application completed. Signalling finish to RM");
-
-      FinalApplicationStatus appStatus;
-      String appMessage = null;
-      boolean isSuccess = true;
-      if (numFailedContainers.get() == 0) {
-        appStatus = FinalApplicationStatus.SUCCEEDED;
-      } else {
-        appStatus = FinalApplicationStatus.FAILED;
-        appMessage = "Diagnostics." + ", total=" + numTotalContainers
-            + ", completed=" + numCompletedContainers.get() + ", allocated="
-            + numAllocatedContainers.get() + ", failed="
-            + numFailedContainers.get();
-        isSuccess = false;
+    @Override
+    public void onContainersAllocated(List<Container> allocatedContainers) {
+      LOG.info("Got response from RM for container ask, allocatedCnt="
+          + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerState" + allocatedContainer.getState()
+            + ", containerResourceMemory"
+            + allocatedContainer.getResource().getMemory());
+        // + ", containerToken"
+        // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+            allocatedContainer);
+        Thread launchThread = new Thread(runnableLaunchContainer);
+
+        // launch and start the container on a separate thread to keep
+        // the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchThread.start();
       }
-      resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
-      return isSuccess;
-    } finally {
-      resourceManager.stop();
     }
+
+    @Override
+    public void onRebootRequest() {}
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
   }
 
   /**
@@ -811,21 +788,4 @@ public class ApplicationMaster {
     LOG.info("Requested container ask: " + request.toString());
     return request;
   }
-
-  /**
-   * Ask RM to allocate given no. of containers to this Application Master
-   *
-   * @param requestedContainers Containers to ask for from RM
-   * @return Response from RM to AM with allocated containers
-   * @throws YarnRemoteException
-   */
-  private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
-    float progressIndicator = (float) numCompletedContainers.get()
-        / numTotalContainers;
-
-    LOG.info("Sending request to RM for containers" + ", progress="
-        + progressIndicator);
-
-    return resourceManager.allocate(progressIndicator);
-  }
 }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1459555&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Thu Mar 21 22:26:06 2013
@@ -0,0 +1,354 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>AMRMClientAsync</code> handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions.  It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ * 
+ * It should be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ *   public void onContainersAllocated(List<Container> containers) {
+ *     [run tasks on the containers]
+ *   }
+ *   
+ *   public void onContainersCompleted(List<ContainerStatus> statuses) {
+ *     [update progress, check whether app is done]
+ *   }
+ *   
+ *   public void onNodesUpdated(List<NodeReport> updated) {}
+ *   
+ *   public void onReboot() {}
+ * }
+ * }
+ * </pre>
+ * 
+ * The client's lifecycle should be managed similarly to the following:
+ * 
+ * <pre>
+ * {@code
+ * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ *       appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Unstable
+@Evolving
+public class AMRMClientAsync extends AbstractService {
+  
+  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
+  
+  private final AMRMClient client;
+  private final int intervalMs;
+  private final HeartbeatThread heartbeatThread;
+  private final CallbackHandlerThread handlerThread;
+  private final CallbackHandler handler;
+
+  private final BlockingQueue<AllocateResponse> responseQueue;
+  
+  private volatile boolean keepRunning;
+  private volatile float progress;
+  
+  public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+      CallbackHandler callbackHandler) {
+    this(new AMRMClientImpl(id), intervalMs, callbackHandler);
+  }
+  
+  @Private
+  @VisibleForTesting
+  AMRMClientAsync(AMRMClient client, int intervalMs,
+      CallbackHandler callbackHandler) {
+    super(AMRMClientAsync.class.getName());
+    this.client = client;
+    this.intervalMs = intervalMs;
+    handler = callbackHandler;
+    heartbeatThread = new HeartbeatThread();
+    handlerThread = new CallbackHandlerThread();
+    responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+    keepRunning = true;
+  }
+  
+  /**
+   * Sets the application's current progress. It will be transmitted to the
+   * resource manager on the next heartbeat.
+   * @param progress
+   *    the application's progress so far
+   */
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+  
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    client.init(conf);
+  }
+  
+  @Override
+  public void start() {
+    handlerThread.start();
+    client.start();
+    super.start();
+  }
+  
+  /**
+   * Tells the heartbeat and handler threads to stop and waits for them to
+   * terminate.  Calling this method from the callback handler thread would cause
+   * deadlock, and thus should be avoided.
+   */
+  @Override
+  public void stop() {
+    if (Thread.currentThread() == handlerThread) {
+      throw new YarnException("Cannot call stop from callback handler thread!");
+    }
+    keepRunning = false;
+    try {
+      heartbeatThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Error joining with heartbeat thread", ex);
+    }
+    client.stop();
+    try {
+      handlerThread.interrupt();
+      handlerThread.join();
+    } catch (InterruptedException ex) {
+      LOG.error("Error joining with hander thread", ex);
+    }
+    super.stop();
+  }
+  
+  /**
+   * Registers this application master with the resource manager. On successful
+   * registration, starts the heartbeating thread.
+   */
+  public RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnRemoteException {
+    RegisterApplicationMasterResponse response =
+        client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+    heartbeatThread.start();
+    return response;
+  }
+
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnRemoteException
+   */
+  public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+      String appMessage, String appTrackingUrl) throws YarnRemoteException {
+    synchronized (client) {
+      keepRunning = false;
+      client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
+    }
+  }
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public void addContainerRequest(AMRMClient.ContainerRequest req) {
+    client.addContainerRequest(req);
+  }
+
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+    client.removeContainerRequest(req);
+  }
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public void releaseAssignedContainer(ContainerId containerId) {
+    client.releaseAssignedContainer(containerId);
+  }
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public Resource getClusterAvailableResources() {
+    return client.getClusterAvailableResources();
+  }
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public int getClusterNodeCount() {
+    return client.getClusterNodeCount();
+  }
+  
+  private class HeartbeatThread extends Thread {
+    public HeartbeatThread() {
+      super("AMRM Heartbeater thread");
+    }
+    
+    public void run() {
+      while (true) {
+        AllocateResponse response = null;
+        // synchronization ensures we don't send heartbeats after unregistering
+        synchronized (client) {
+          if (!keepRunning) {
+            break;
+          }
+            
+          try {
+            response = client.allocate(progress);
+          } catch (YarnRemoteException ex) {
+            LOG.error("Failed to heartbeat", ex);
+          }
+        }
+        if (response != null) {
+          while (true) {
+            try {
+              responseQueue.put(response);
+              break;
+            } catch (InterruptedException ex) {
+              LOG.warn("Interrupted while waiting to put on response queue", ex);
+            }
+          }
+        }
+        
+        try {
+          Thread.sleep(intervalMs);
+        } catch (InterruptedException ex) {
+          LOG.warn("Heartbeater interrupted", ex);
+        }
+      }
+    }
+  }
+  
+  private class CallbackHandlerThread extends Thread {
+    public CallbackHandlerThread() {
+      super("AMRM Callback Handler Thread");
+    }
+    
+    public void run() {
+      while (keepRunning) {
+        AllocateResponse response;
+        try {
+          response = responseQueue.take();
+        } catch (InterruptedException ex) {
+          LOG.info("Interrupted while waiting for queue");
+          continue;
+        }
+
+        if (response.getReboot()) {
+          handler.onRebootRequest();
+        }
+        List<NodeReport> updatedNodes = response.getUpdatedNodes();
+        if (!updatedNodes.isEmpty()) {
+          handler.onNodesUpdated(updatedNodes);
+        }
+        
+        List<ContainerStatus> completed =
+            response.getCompletedContainersStatuses();
+        if (!completed.isEmpty()) {
+          handler.onContainersCompleted(completed);
+        }
+
+        List<Container> allocated = response.getAllocatedContainers();
+        if (!allocated.isEmpty()) {
+          handler.onContainersAllocated(allocated);
+        }
+      }
+    }
+  }
+  
+  public interface CallbackHandler {
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with completed
+     * containers. If the response contains both completed containers and
+     * allocated containers, this will be called before containersAllocated.
+     */
+    public void onContainersCompleted(List<ContainerStatus> statuses);
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with allocated
+     * containers. If the response containers both completed containers and
+     * allocated containers, this will be called after containersCompleted.
+     */
+    public void onContainersAllocated(List<Container> containers);
+    
+    /**
+     * Called when the ResourceManager wants the ApplicationMaster to reboot
+     * for being out of sync.
+     */
+    public void onRebootRequest();
+    
+    /**
+     * Called when nodes tracked by the ResourceManager have changed in in health,
+     * availability etc.
+     */
+    public void onNodesUpdated(List<NodeReport> updatedNodes);
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java?rev=1459555&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java Thu Mar 21 22:26:06 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.mockito.Mockito.anyFloat;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestAMRMClientAsync {
+
+  private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
+  
+  @Test(timeout=10000)
+  public void testAMRMClientAsync() throws Exception {
+    Configuration conf = new Configuration();
+    List<ContainerStatus> completed1 = Arrays.asList(
+        BuilderUtils.newContainerStatus(
+            BuilderUtils.newContainerId(0, 0, 0, 0),
+            ContainerState.COMPLETE, "", 0));
+    List<Container> allocated1 = Arrays.asList(
+        BuilderUtils.newContainer(null, null, null, null, null, null));
+    final AllocateResponse response1 = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), allocated1);
+    final AllocateResponse response2 = createAllocateResponse(completed1,
+        new ArrayList<Container>());
+    final AllocateResponse emptyResponse = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+
+    TestCallbackHandler callbackHandler = new TestCallbackHandler();
+    AMRMClient client = mock(AMRMClient.class);
+    final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
+    when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
+      @Override
+      public AllocateResponse answer(InvocationOnMock invocation)
+          throws Throwable {
+        secondHeartbeatReceived.set(true);
+        return response2;
+      }
+    }).thenReturn(emptyResponse);
+    when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
+      .thenReturn(null);
+    
+    AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
+    asyncClient.init(conf);
+    asyncClient.start();
+    asyncClient.registerApplicationMaster("localhost", 1234, null);
+    
+    // while the CallbackHandler will still only be processing the first response,
+    // heartbeater thread should still be sending heartbeats.
+    // To test this, wait for the second heartbeat to be received. 
+    while (!secondHeartbeatReceived.get()) {
+      Thread.sleep(10);
+    }
+    
+    // allocated containers should come before completed containers
+    Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+    
+    // wait for the allocated containers from the first heartbeat's response
+    while (callbackHandler.takeAllocatedContainers() == null) {
+      Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+      Thread.sleep(10);
+    }
+    
+    // wait for the completed containers from the second heartbeat's response
+    while (callbackHandler.takeCompletedContainers() == null) {
+      Thread.sleep(10);
+    }
+    
+    asyncClient.stop();
+    
+    Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
+    Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+  }
+  
+  private AllocateResponse createAllocateResponse(
+      List<ContainerStatus> completed, List<Container> allocated) {
+    AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
+        new ArrayList<NodeReport>(), null, false, 1);
+    return response;
+  }
+  
+  private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
+    private volatile List<ContainerStatus> completedContainers;
+    private volatile List<Container> allocatedContainers;
+    
+    public List<ContainerStatus> takeCompletedContainers() {
+      List<ContainerStatus> ret = completedContainers;
+      if (ret == null) {
+        return null;
+      }
+      completedContainers = null;
+      synchronized (ret) {
+        ret.notify();
+      }
+      return ret;
+    }
+    
+    public List<Container> takeAllocatedContainers() {
+      List<Container> ret = allocatedContainers;
+      if (ret == null) {
+        return null;
+      }
+      allocatedContainers = null;
+      synchronized (ret) {
+        ret.notify();
+      }
+      return ret;
+    }
+    
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> statuses) {
+      completedContainers = statuses;
+      // wait for containers to be taken before returning
+      synchronized (completedContainers) {
+        while (completedContainers != null) {
+          try {
+            completedContainers.wait();
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted during wait", ex);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+      allocatedContainers = containers;
+      // wait for containers to be taken before returning
+      synchronized (allocatedContainers) {
+        while (allocatedContainers != null) {
+          try {
+            allocatedContainers.wait();
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted during wait", ex);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void onRebootRequest() {}
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+  }
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1459555&r1=1459554&r2=1459555&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Thu Mar 21 22:26:06 2013
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,4 +405,21 @@ public class BuilderUtils {
     allocateRequest.addAllReleases(containersToBeReleased);
     return allocateRequest;
   }
+  
+  public static AllocateResponse newAllocateResponse(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, boolean reboot, int numClusterNodes) {
+    AllocateResponse response = recordFactory
+        .newRecordInstance(AllocateResponse.class);
+    response.setNumClusterNodes(numClusterNodes);
+    response.setResponseId(responseId);
+    response.setCompletedContainersStatuses(completedContainers);
+    response.setAllocatedContainers(allocatedContainers);
+    response.setUpdatedNodes(updatedNodes);
+    response.setAvailableResources(availResources);
+    response.setReboot(reboot);
+
+    return response;
+  }
 }