You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/03/21 23:26:06 UTC
svn commit: r1459555 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yarn-client/src...
Author: bikas
Date: Thu Mar 21 22:26:06 2013
New Revision: 1459555
URL: http://svn.apache.org/r1459555
Log:
YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks. (Sandy Ryza via bikas)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1459555&r1=1459554&r2=1459555&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Mar 21 22:26:06 2013
@@ -88,6 +88,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-297. Improve hashCode implementations for PB records. (Xuan Gong via
hitesh)
+ YARN-417. Create AMRMClient wrapper that provides asynchronous callbacks.
+ (Sandy Ryza via bikas)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1459555&r1=1459554&r2=1459555&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Thu Mar 21 22:26:06 2013
@@ -63,12 +63,12 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient;
import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -147,8 +147,8 @@ public class ApplicationMaster {
private YarnRPC rpc;
// Handle to communicate with the Resource Manager
- private AMRMClient resourceManager;
-
+ private AMRMClientAsync resourceManager;
+
// Application Attempt Id ( combination of attemptId and fail count )
private ApplicationAttemptId appAttemptID;
@@ -169,8 +169,6 @@ public class ApplicationMaster {
// Priority of the request
private int requestPriority;
- // Simple flag to denote whether all works is done
- private boolean appDone = false;
// Counter for completed containers ( complete denotes successful or failed )
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
@@ -201,6 +199,9 @@ public class ApplicationMaster {
// Hardcoded path to shell script in launch container's local env
private final String ExecShellStringPath = "ExecShellScript.sh";
+ private volatile boolean done;
+ private volatile boolean success;
+
// Launch threads
private List<Thread> launchThreads = new ArrayList<Thread>();
@@ -416,226 +417,202 @@ public class ApplicationMaster {
public boolean run() throws YarnRemoteException {
LOG.info("Starting ApplicationMaster");
- // Connect to ResourceManager
- resourceManager = new AMRMClientImpl(appAttemptID);
+ AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+
+ resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
- try {
- // Setup local RPC Server to accept status requests directly from clients
- // TODO need to setup a protocol for client to be able to communicate to
- // the RPC server
- // TODO use the rpc port info to register with the RM for the client to
- // send requests to this app master
-
- // Register self with ResourceManager
- RegisterApplicationMasterResponse response = resourceManager
- .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- appMasterTrackingUrl);
- // Dump out information about cluster capability as seen by the
- // resource manager
- int minMem = response.getMinimumResourceCapability().getMemory();
- int maxMem = response.getMaximumResourceCapability().getMemory();
- LOG.info("Min mem capabililty of resources in this cluster " + minMem);
- LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
- // A resource ask has to be atleast the minimum of the capability of the
- // cluster, the value has to be a multiple of the min value and cannot
- // exceed the max.
- // If it is not an exact multiple of min, the RM will allocate to the
- // nearest multiple of min
- if (containerMemory < minMem) {
- LOG.info("Container memory specified below min threshold of cluster."
- + " Using min value." + ", specified=" + containerMemory + ", min="
- + minMem);
- containerMemory = minMem;
- } else if (containerMemory > maxMem) {
- LOG.info("Container memory specified above max threshold of cluster."
- + " Using max value." + ", specified=" + containerMemory + ", max="
- + maxMem);
- containerMemory = maxMem;
- }
-
- // Setup heartbeat emitter
- // TODO poll RM every now and then with an empty request to let RM know
- // that we are alive
- // The heartbeat interval after which an AM is timed out by the RM is
- // defined by a config setting:
- // RM_AM_EXPIRY_INTERVAL_MS with default defined by
- // DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
- // The allocate calls to the RM count as heartbeats so, for now,
- // this additional heartbeat emitter is not required.
-
- // Setup ask for containers from RM
- // Send request for containers to RM
- // Until we get our fully allocated quota, we keep on polling RM for
- // containers
- // Keep looping until all the containers are launched and shell script
- // executed on them ( regardless of success/failure).
-
- int loopCounter = -1;
-
- while (numCompletedContainers.get() < numTotalContainers && !appDone) {
- loopCounter++;
-
- // log current state
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers + ", completed="
- + numCompletedContainers + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // Sleep before each loop when asking RM for containers
- // to avoid flooding RM with spurious requests when it
- // need not have any available containers
- // Sleeping for 1000 ms.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted " + e.getMessage());
- }
+ // Setup local RPC Server to accept status requests directly from clients
+ // TODO need to setup a protocol for client to be able to communicate to
+ // the RPC server
+ // TODO use the rpc port info to register with the RM for the client to
+ // send requests to this app master
+
+ // Register self with ResourceManager
+ // This will start heartbeating to the RM
+ RegisterApplicationMasterResponse response = resourceManager
+ .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ appMasterTrackingUrl);
+ // Dump out information about cluster capability as seen by the
+ // resource manager
+ int minMem = response.getMinimumResourceCapability().getMemory();
+ int maxMem = response.getMaximumResourceCapability().getMemory();
+ LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // A resource ask has to be atleast the minimum of the capability of the
+ // cluster, the value has to be a multiple of the min value and cannot
+ // exceed the max.
+ // If it is not an exact multiple of min, the RM will allocate to the
+ // nearest multiple of min
+ if (containerMemory < minMem) {
+ LOG.info("Container memory specified below min threshold of cluster."
+ + " Using min value." + ", specified=" + containerMemory + ", min="
+ + minMem);
+ containerMemory = minMem;
+ } else if (containerMemory > maxMem) {
+ LOG.info("Container memory specified above max threshold of cluster."
+ + " Using max value." + ", specified=" + containerMemory + ", max="
+ + maxMem);
+ containerMemory = maxMem;
+ }
- // No. of containers to request
- // For the first loop, askCount will be equal to total containers needed
- // From that point on, askCount will always be 0 as current
- // implementation does not change its ask on container failures.
- int askCount = numTotalContainers - numRequestedContainers.get();
- numRequestedContainers.addAndGet(askCount);
-
- if (askCount > 0) {
- ContainerRequest containerAsk = setupContainerAskForRM(askCount);
- resourceManager.addContainerRequest(containerAsk);
- }
- // Send the request to RM
- LOG.info("Asking RM for containers" + ", askCount=" + askCount);
- AllocateResponse allocResp = sendContainerAskToRM();
-
- // Retrieve list of allocated containers from the response
- List<Container> allocatedContainers =
- allocResp.getAllocatedContainers();
- LOG.info("Got response from RM for container ask, allocatedCnt="
- + allocatedContainers.size());
- numAllocatedContainers.addAndGet(allocatedContainers.size());
- for (Container allocatedContainer : allocatedContainers) {
- LOG.info("Launching shell command on a new container."
- + ", containerId=" + allocatedContainer.getId()
- + ", containerNode=" + allocatedContainer.getNodeId().getHost()
- + ":" + allocatedContainer.getNodeId().getPort()
- + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
- + ", containerState" + allocatedContainer.getState()
- + ", containerResourceMemory"
- + allocatedContainer.getResource().getMemory());
- // + ", containerToken"
- // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
- LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
- allocatedContainer);
- Thread launchThread = new Thread(runnableLaunchContainer);
-
- // launch and start the container on a separate thread to keep
- // the main thread unblocked
- // as all containers may not be allocated at one go.
- launchThreads.add(launchThread);
- launchThread.start();
- }
+ // Setup ask for containers from RM
+ // Send request for containers to RM
+ // Until we get our fully allocated quota, we keep on polling RM for
+ // containers
+ // Keep looping until all the containers are launched and shell script
+ // executed on them ( regardless of success/failure).
+ ContainerRequest containerAsk = setupContainerAskForRM(numTotalContainers);
+ resourceManager.addContainerRequest(containerAsk);
+ numRequestedContainers.set(numTotalContainers);
- // Check what the current available resources in the cluster are
- // TODO should we do anything if the available resources are not enough?
- Resource availableResources = allocResp.getAvailableResources();
- LOG.info("Current available resources in the cluster "
- + availableResources);
-
- // Check the completed containers
- List<ContainerStatus> completedContainers = allocResp
- .getCompletedContainersStatuses();
- LOG.info("Got response from RM for container ask, completedCnt="
- + completedContainers.size());
- for (ContainerStatus containerStatus : completedContainers) {
- LOG.info("Got container status for containerID="
- + containerStatus.getContainerId() + ", state="
- + containerStatus.getState() + ", exitStatus="
- + containerStatus.getExitStatus() + ", diagnostics="
- + containerStatus.getDiagnostics());
-
- // non complete containers should not be here
- assert (containerStatus.getState() == ContainerState.COMPLETE);
-
- // increment counters for completed/failed containers
- int exitStatus = containerStatus.getExitStatus();
- if (0 != exitStatus) {
- // container failed
- if (-100 != exitStatus) {
- // shell script failed
- // counts as completed
- numCompletedContainers.incrementAndGet();
- numFailedContainers.incrementAndGet();
- } else {
- // something else bad happened
- // app job did not complete for some reason
- // we should re-try as the container was lost for some reason
- numAllocatedContainers.decrementAndGet();
- numRequestedContainers.decrementAndGet();
- // we do not need to release the container as it would be done
- // by the RM/CM.
- }
- } else {
- // nothing to do
- // container completed successfully
+ while (!done) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {}
+ }
+ finish();
+
+ return success;
+ }
+
+ private void finish() {
+ // Join all launched threads
+ // needed for when we time out
+ // and we need to release containers
+ for (Thread launchThread : launchThreads) {
+ try {
+ launchThread.join(10000);
+ } catch (InterruptedException e) {
+ LOG.info("Exception thrown in thread join: " + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ // When the application completes, it should send a finish application
+ // signal to the RM
+ LOG.info("Application completed. Signalling finish to RM");
+
+ FinalApplicationStatus appStatus;
+ String appMessage = null;
+ success = true;
+ if (numFailedContainers.get() == 0) {
+ appStatus = FinalApplicationStatus.SUCCEEDED;
+ } else {
+ appStatus = FinalApplicationStatus.FAILED;
+ appMessage = "Diagnostics." + ", total=" + numTotalContainers
+ + ", completed=" + numCompletedContainers.get() + ", allocated="
+ + numAllocatedContainers.get() + ", failed="
+ + numFailedContainers.get();
+ success = false;
+ }
+ try {
+ resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
+ } catch (YarnRemoteException ex) {
+ LOG.error("Failed to unregister application", ex);
+ }
+
+ done = true;
+ resourceManager.stop();
+ }
+
+ private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt="
+ + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state="
+ + containerStatus.getState() + ", exitStatus="
+ + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS != exitStatus) {
+ // shell script failed
+ // counts as completed
numCompletedContainers.incrementAndGet();
- LOG.info("Container completed successfully." + ", containerId="
- + containerStatus.getContainerId());
+ numFailedContainers.incrementAndGet();
+ } else {
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
+ numAllocatedContainers.decrementAndGet();
+ numRequestedContainers.decrementAndGet();
+ // we do not need to release the container as it would be done
+ // by the RM
}
+ } else {
+ // nothing to do
+ // container completed successfully
+ numCompletedContainers.incrementAndGet();
+ LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
}
- if (numCompletedContainers.get() == numTotalContainers) {
- appDone = true;
- }
-
- LOG.info("Current application state: loop=" + loopCounter
- + ", appDone=" + appDone + ", total=" + numTotalContainers
- + ", requested=" + numRequestedContainers + ", completed="
- + numCompletedContainers + ", failed=" + numFailedContainers
- + ", currentAllocated=" + numAllocatedContainers);
-
- // TODO
- // Add a timeout handling layer
- // for misbehaving shell commands
}
-
- // Join all launched threads
- // needed for when we time out
- // and we need to release containers
- for (Thread launchThread : launchThreads) {
- try {
- launchThread.join(10000);
- } catch (InterruptedException e) {
- LOG.info("Exception thrown in thread join: " + e.getMessage());
- e.printStackTrace();
- }
+
+ // ask for more containers if any failed
+ int askCount = numTotalContainers - numRequestedContainers.get();
+ numRequestedContainers.addAndGet(askCount);
+
+ if (askCount > 0) {
+ ContainerRequest containerAsk = setupContainerAskForRM(askCount);
+ resourceManager.addContainerRequest(containerAsk);
+ }
+
+ // set progress to deliver to RM on next heartbeat
+ float progress = (float) numCompletedContainers.get()
+ / numTotalContainers;
+ resourceManager.setProgress(progress);
+
+ if (numCompletedContainers.get() == numTotalContainers) {
+ done = true;
}
+ }
- // When the application completes, it should send a finish application
- // signal to the RM
- LOG.info("Application completed. Signalling finish to RM");
-
- FinalApplicationStatus appStatus;
- String appMessage = null;
- boolean isSuccess = true;
- if (numFailedContainers.get() == 0) {
- appStatus = FinalApplicationStatus.SUCCEEDED;
- } else {
- appStatus = FinalApplicationStatus.FAILED;
- appMessage = "Diagnostics." + ", total=" + numTotalContainers
- + ", completed=" + numCompletedContainers.get() + ", allocated="
- + numAllocatedContainers.get() + ", failed="
- + numFailedContainers.get();
- isSuccess = false;
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ numAllocatedContainers.addAndGet(allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ LOG.info("Launching shell command on a new container."
+ + ", containerId=" + allocatedContainer.getId()
+ + ", containerNode=" + allocatedContainer.getNodeId().getHost()
+ + ":" + allocatedContainer.getNodeId().getPort()
+ + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+ + ", containerState" + allocatedContainer.getState()
+ + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+ // + ", containerToken"
+ // +allocatedContainer.getContainerToken().getIdentifier().toString());
+
+ LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+ allocatedContainer);
+ Thread launchThread = new Thread(runnableLaunchContainer);
+
+ // launch and start the container on a separate thread to keep
+ // the main thread unblocked
+ // as all containers may not be allocated at one go.
+ launchThreads.add(launchThread);
+ launchThread.start();
}
- resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
- return isSuccess;
- } finally {
- resourceManager.stop();
}
+
+ @Override
+ public void onRebootRequest() {}
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
}
/**
@@ -811,21 +788,4 @@ public class ApplicationMaster {
LOG.info("Requested container ask: " + request.toString());
return request;
}
-
- /**
- * Ask RM to allocate given no. of containers to this Application Master
- *
- * @param requestedContainers Containers to ask for from RM
- * @return Response from RM to AM with allocated containers
- * @throws YarnRemoteException
- */
- private AllocateResponse sendContainerAskToRM() throws YarnRemoteException {
- float progressIndicator = (float) numCompletedContainers.get()
- / numTotalContainers;
-
- LOG.info("Sending request to RM for containers" + ", progress="
- + progressIndicator);
-
- return resourceManager.allocate(progressIndicator);
- }
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1459555&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Thu Mar 21 22:26:06 2013
@@ -0,0 +1,354 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>AMRMClientAsync</code> handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions. It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ *
+ * It should be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ * public void onContainersAllocated(List<Container> containers) {
+ * [run tasks on the containers]
+ * }
+ *
+ * public void onContainersCompleted(List<ContainerStatus> statuses) {
+ * [update progress, check whether app is done]
+ * }
+ *
+ * public void onNodesUpdated(List<NodeReport> updated) {}
+ *
+ * public void onReboot() {}
+ * }
+ * }
+ * </pre>
+ *
+ * The client's lifecycle should be managed similarly to the following:
+ *
+ * <pre>
+ * {@code
+ * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ * .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ * appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Unstable
+@Evolving
+public class AMRMClientAsync extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
+
+ private final AMRMClient client;
+ private final int intervalMs;
+ private final HeartbeatThread heartbeatThread;
+ private final CallbackHandlerThread handlerThread;
+ private final CallbackHandler handler;
+
+ private final BlockingQueue<AllocateResponse> responseQueue;
+
+ private volatile boolean keepRunning;
+ private volatile float progress;
+
+ public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+ CallbackHandler callbackHandler) {
+ this(new AMRMClientImpl(id), intervalMs, callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ AMRMClientAsync(AMRMClient client, int intervalMs,
+ CallbackHandler callbackHandler) {
+ super(AMRMClientAsync.class.getName());
+ this.client = client;
+ this.intervalMs = intervalMs;
+ handler = callbackHandler;
+ heartbeatThread = new HeartbeatThread();
+ handlerThread = new CallbackHandlerThread();
+ responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+ keepRunning = true;
+ }
+
+ /**
+ * Sets the application's current progress. It will be transmitted to the
+ * resource manager on the next heartbeat.
+ * @param progress
+ * the application's progress so far
+ */
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ super.init(conf);
+ client.init(conf);
+ }
+
+ @Override
+ public void start() {
+ handlerThread.start();
+ client.start();
+ super.start();
+ }
+
+ /**
+ * Tells the heartbeat and handler threads to stop and waits for them to
+ * terminate. Calling this method from the callback handler thread would cause
+ * deadlock, and thus should be avoided.
+ */
+ @Override
+ public void stop() {
+ if (Thread.currentThread() == handlerThread) {
+ throw new YarnException("Cannot call stop from callback handler thread!");
+ }
+ keepRunning = false;
+ try {
+ heartbeatThread.join();
+ } catch (InterruptedException ex) {
+ LOG.error("Error joining with heartbeat thread", ex);
+ }
+ client.stop();
+ try {
+ handlerThread.interrupt();
+ handlerThread.join();
+ } catch (InterruptedException ex) {
+ LOG.error("Error joining with hander thread", ex);
+ }
+ super.stop();
+ }
+
+ /**
+ * Registers this application master with the resource manager. On successful
+ * registration, starts the heartbeating thread.
+ */
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnRemoteException {
+ RegisterApplicationMasterResponse response =
+ client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+ heartbeatThread.start();
+ return response;
+ }
+
+ /**
+ * Unregister the application master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnRemoteException
+ */
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage, String appTrackingUrl) throws YarnRemoteException {
+ synchronized (client) {
+ keepRunning = false;
+ client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
+ }
+ }
+
+ /**
+ * Request containers for resources before calling <code>allocate</code>
+ * @param req Resource request
+ */
+ public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ client.addContainerRequest(req);
+ }
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public void removeContainerRequest(AMRMClient.ContainerRequest req) {
+ client.removeContainerRequest(req);
+ }
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release them.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. eg. it released non-local resources
+ * @param containerId
+ */
+ public void releaseAssignedContainer(ContainerId containerId) {
+ client.releaseAssignedContainer(containerId);
+ }
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public Resource getClusterAvailableResources() {
+ return client.getClusterAvailableResources();
+ }
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public int getClusterNodeCount() {
+ return client.getClusterNodeCount();
+ }
+
+ private class HeartbeatThread extends Thread {
+ public HeartbeatThread() {
+ super("AMRM Heartbeater thread");
+ }
+
+ public void run() {
+ while (true) {
+ AllocateResponse response = null;
+ // synchronization ensures we don't send heartbeats after unregistering
+ synchronized (client) {
+ if (!keepRunning) {
+ break;
+ }
+
+ try {
+ response = client.allocate(progress);
+ } catch (YarnRemoteException ex) {
+ LOG.error("Failed to heartbeat", ex);
+ }
+ }
+ if (response != null) {
+ while (true) {
+ try {
+ responseQueue.put(response);
+ break;
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while waiting to put on response queue", ex);
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(intervalMs);
+ } catch (InterruptedException ex) {
+ LOG.warn("Heartbeater interrupted", ex);
+ }
+ }
+ }
+ }
+
+ private class CallbackHandlerThread extends Thread {
+ public CallbackHandlerThread() {
+ super("AMRM Callback Handler Thread");
+ }
+
+ public void run() {
+ while (keepRunning) {
+ AllocateResponse response;
+ try {
+ response = responseQueue.take();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting for queue");
+ continue;
+ }
+
+ if (response.getReboot()) {
+ handler.onRebootRequest();
+ }
+ List<NodeReport> updatedNodes = response.getUpdatedNodes();
+ if (!updatedNodes.isEmpty()) {
+ handler.onNodesUpdated(updatedNodes);
+ }
+
+ List<ContainerStatus> completed =
+ response.getCompletedContainersStatuses();
+ if (!completed.isEmpty()) {
+ handler.onContainersCompleted(completed);
+ }
+
+ List<Container> allocated = response.getAllocatedContainers();
+ if (!allocated.isEmpty()) {
+ handler.onContainersAllocated(allocated);
+ }
+ }
+ }
+ }
+
+ public interface CallbackHandler {
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with completed
+ * containers. If the response contains both completed containers and
+ * allocated containers, this will be called before containersAllocated.
+ */
+ public void onContainersCompleted(List<ContainerStatus> statuses);
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with allocated
+ * containers. If the response containers both completed containers and
+ * allocated containers, this will be called after containersCompleted.
+ */
+ public void onContainersAllocated(List<Container> containers);
+
+ /**
+ * Called when the ResourceManager wants the ApplicationMaster to reboot
+ * for being out of sync.
+ */
+ public void onRebootRequest();
+
+ /**
+ * Called when nodes tracked by the ResourceManager have changed in in health,
+ * availability etc.
+ */
+ public void onNodesUpdated(List<NodeReport> updatedNodes);
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java?rev=1459555&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java Thu Mar 21 22:26:06 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.mockito.Mockito.anyFloat;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestAMRMClientAsync {
+
+ private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
+
+ @Test(timeout=10000)
+ public void testAMRMClientAsync() throws Exception {
+ Configuration conf = new Configuration();
+ List<ContainerStatus> completed1 = Arrays.asList(
+ BuilderUtils.newContainerStatus(
+ BuilderUtils.newContainerId(0, 0, 0, 0),
+ ContainerState.COMPLETE, "", 0));
+ List<Container> allocated1 = Arrays.asList(
+ BuilderUtils.newContainer(null, null, null, null, null, null));
+ final AllocateResponse response1 = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), allocated1);
+ final AllocateResponse response2 = createAllocateResponse(completed1,
+ new ArrayList<Container>());
+ final AllocateResponse emptyResponse = createAllocateResponse(
+ new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+
+ TestCallbackHandler callbackHandler = new TestCallbackHandler();
+ AMRMClient client = mock(AMRMClient.class);
+ final AtomicBoolean secondHeartbeatReceived = new AtomicBoolean(false);
+ when(client.allocate(anyFloat())).thenReturn(response1).thenAnswer(new Answer<AllocateResponse>() {
+ @Override
+ public AllocateResponse answer(InvocationOnMock invocation)
+ throws Throwable {
+ secondHeartbeatReceived.set(true);
+ return response2;
+ }
+ }).thenReturn(emptyResponse);
+ when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
+ .thenReturn(null);
+
+ AMRMClientAsync asyncClient = new AMRMClientAsync(client, 20, callbackHandler);
+ asyncClient.init(conf);
+ asyncClient.start();
+ asyncClient.registerApplicationMaster("localhost", 1234, null);
+
+ // while the CallbackHandler will still only be processing the first response,
+ // heartbeater thread should still be sending heartbeats.
+ // To test this, wait for the second heartbeat to be received.
+ while (!secondHeartbeatReceived.get()) {
+ Thread.sleep(10);
+ }
+
+ // allocated containers should come before completed containers
+ Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+
+ // wait for the allocated containers from the first heartbeat's response
+ while (callbackHandler.takeAllocatedContainers() == null) {
+ Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+ Thread.sleep(10);
+ }
+
+ // wait for the completed containers from the second heartbeat's response
+ while (callbackHandler.takeCompletedContainers() == null) {
+ Thread.sleep(10);
+ }
+
+ asyncClient.stop();
+
+ Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
+ Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+ }
+
+ private AllocateResponse createAllocateResponse(
+ List<ContainerStatus> completed, List<Container> allocated) {
+ AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
+ new ArrayList<NodeReport>(), null, false, 1);
+ return response;
+ }
+
+ private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ private volatile List<ContainerStatus> completedContainers;
+ private volatile List<Container> allocatedContainers;
+
+ public List<ContainerStatus> takeCompletedContainers() {
+ List<ContainerStatus> ret = completedContainers;
+ if (ret == null) {
+ return null;
+ }
+ completedContainers = null;
+ synchronized (ret) {
+ ret.notify();
+ }
+ return ret;
+ }
+
+ public List<Container> takeAllocatedContainers() {
+ List<Container> ret = allocatedContainers;
+ if (ret == null) {
+ return null;
+ }
+ allocatedContainers = null;
+ synchronized (ret) {
+ ret.notify();
+ }
+ return ret;
+ }
+
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> statuses) {
+ completedContainers = statuses;
+ // wait for containers to be taken before returning
+ synchronized (completedContainers) {
+ while (completedContainers != null) {
+ try {
+ completedContainers.wait();
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted during wait", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> containers) {
+ allocatedContainers = containers;
+ // wait for containers to be taken before returning
+ synchronized (allocatedContainers) {
+ while (allocatedContainers != null) {
+ try {
+ allocatedContainers.wait();
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted during wait", ex);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onRebootRequest() {}
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {}
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1459555&r1=1459554&r2=1459555&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Thu Mar 21 22:26:06 2013
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,4 +405,21 @@ public class BuilderUtils {
allocateRequest.addAllReleases(containersToBeReleased);
return allocateRequest;
}
+
+ public static AllocateResponse newAllocateResponse(int responseId,
+ List<ContainerStatus> completedContainers,
+ List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+ Resource availResources, boolean reboot, int numClusterNodes) {
+ AllocateResponse response = recordFactory
+ .newRecordInstance(AllocateResponse.class);
+ response.setNumClusterNodes(numClusterNodes);
+ response.setResponseId(responseId);
+ response.setCompletedContainersStatuses(completedContainers);
+ response.setAllocatedContainers(allocatedContainers);
+ response.setUpdatedNodes(updatedNodes);
+ response.setAvailableResources(availResources);
+ response.setReboot(reboot);
+
+ return response;
+ }
}