You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/18 06:02:48 UTC
svn commit: r1494017 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yarn-appl...
Author: vinodkv
Date: Tue Jun 18 04:02:47 2013
New Revision: 1494017
URL: http://svn.apache.org/r1494017
Log:
YARN-834. Fixed annotations for yarn-client module, reorganized packages and clearly differentiated *Async apis. Contributed by Arun C Murthy and Zhijie Shen.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun 18 04:02:47 2013
@@ -186,6 +186,10 @@ Release 2.1.0-beta - UNRELEASED
YARN-610. ClientToken is no longer set in the environment of the Containers.
(Omkar Vinit Joshi via vinodkv)
+ YARN-834. Fixed annotations for yarn-client module, reorganized packages and
+ clearly differentiated *Async apis. (Arun C Murthy and Zhijie Shen via
+ vinodkv)
+
NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jun 18 04:02:47 2013
@@ -67,9 +67,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -436,17 +436,18 @@ public class ApplicationMaster {
* @throws YarnException
* @throws IOException
*/
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings({ "unchecked" })
public boolean run() throws YarnException, IOException {
LOG.info("Starting ApplicationMaster");
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
- resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
+ resourceManager =
+ AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener);
resourceManager.init(conf);
resourceManager.start();
containerListener = new NMCallbackHandler();
- nmClientAsync = new NMClientAsync(containerListener);
+ nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
@@ -682,7 +683,7 @@ public class ApplicationMaster {
}
Container container = containers.get(containerId);
if (container != null) {
- nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
+ nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
container.getContainerToken());
}
}
@@ -802,7 +803,7 @@ public class ApplicationMaster {
ctx.setCommands(commands);
containerListener.addContainer(container.getId(), container);
- nmClientAsync.startContainer(container, ctx);
+ nmClientAsync.startContainerAsync(container, ctx);
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Tue Jun 18 04:02:47 2013
@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java?rev=1494017&r1=1494016&r2=1494017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java Tue Jun 18 04:02:47 2013
@@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,262 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.collect.ImmutableList;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
+ AbstractService {
+
+ /**
+ * Create a new instance of AMRMClient.
+ * For usage:
+ * <pre>
+ * {@code
+ * AMRMClient.<T>createAMRMClientContainerRequest(appAttemptId)
+ * }</pre>
+ * @param appAttemptId the appAttemptId associated with the AMRMClient
+ * @return the newly create AMRMClient instance.
+ */
+ @Public
+ public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient(
+ ApplicationAttemptId appAttemptId) {
+ AMRMClient<T> client = new AMRMClientImpl<T>(appAttemptId);
+ return client;
+ }
+
+ @Private
+ protected AMRMClient(String name) {
+ super(name);
+ }
+
+ /**
+ * Object to represent container request for resources. Scheduler
+ * documentation should be consulted for the specifics of how the parameters
+ * are honored.
+ * All getters return immutable values.
+ *
+ * @param capability
+ * The {@link Resource} to be requested for each container.
+ * @param nodes
+ * Any hosts to request that the containers are placed on.
+ * @param racks
+ * Any racks to request that the containers are placed on. The racks
+ * corresponding to any hosts requested will be automatically added to
+ * this list.
+ * @param priority
+ * The priority at which to request the containers. Higher priorities have
+ * lower numerical values.
+ * @param containerCount
+ * The number of containers to request.
+ */
+ public static class ContainerRequest {
+ final Resource capability;
+ final List<String> nodes;
+ final List<String> racks;
+ final Priority priority;
+ final int containerCount;
+
+ public ContainerRequest(Resource capability, String[] nodes,
+ String[] racks, Priority priority, int containerCount) {
+ this.capability = capability;
+ this.nodes = (nodes != null ? ImmutableList.copyOf(nodes) : null);
+ this.racks = (racks != null ? ImmutableList.copyOf(racks) : null);
+ this.priority = priority;
+ this.containerCount = containerCount;
+ }
+
+ public Resource getCapability() {
+ return capability;
+ }
+
+ public List<String> getNodes() {
+ return nodes;
+ }
+
+ public List<String> getRacks() {
+ return racks;
+ }
+
+ public Priority getPriority() {
+ return priority;
+ }
+
+ public int getContainerCount() {
+ return containerCount;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ sb.append("ContainerCount[").append(containerCount).append("]");
+ return sb.toString();
+ }
+ }
+
+ /**
+ * This creates a <code>ContainerRequest</code> for 1 container and the
+ * AMRMClient stores this request internally. <code>getMatchingRequests</code>
+ * can be used to retrieve these requests from AMRMClient. These requests may
+ * be matched with an allocated container to determine which request to assign
+ * the container to. <code>removeContainerRequest</code> must be called using
+ * the same assigned <code>StoredContainerRequest</code> object so that
+ * AMRMClient can remove it from its internal store.
+ */
+ public static class StoredContainerRequest extends ContainerRequest {
+ public StoredContainerRequest(Resource capability, String[] nodes,
+ String[] racks, Priority priority) {
+ super(capability, nodes, racks, priority, 1);
+ }
+ }
+
+ /**
+ * Register the application master. This must be called before any
+ * other interaction
+ * @param appHostName Name of the host on which master is running
+ * @param appHostPort Port master is listening on
+ * @param appTrackingUrl URL at which the master info can be seen
+ * @return <code>RegisterApplicationMasterResponse</code>
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract RegisterApplicationMasterResponse
+ registerApplicationMaster(String appHostName,
+ int appHostPort,
+ String appTrackingUrl)
+ throws YarnException, IOException;
+
+ /**
+ * Request additional containers and receive new container allocations.
+ * Requests made via <code>addContainerRequest</code> are sent to the
+ * <code>ResourceManager</code>. New containers assigned to the master are
+ * retrieved. Status of completed containers and node health updates are
+ * also retrieved.
+ * This also doubles up as a heartbeat to the ResourceManager and must be
+ * made periodically.
+ * The call may not always return any new allocations of containers.
+ * App should not make concurrent allocate requests. May cause request loss.
+ * @param progressIndicator Indicates progress made by the master
+ * @return the response of the allocate request
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract AllocateResponse allocate(float progressIndicator)
+ throws YarnException, IOException;
+
+ /**
+ * Unregister the application master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage,
+ String appTrackingUrl)
+ throws YarnException, IOException;
+
+ /**
+ * Request containers for resources before calling <code>allocate</code>
+ * @param req Resource request
+ */
+ public abstract void addContainerRequest(T req);
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public abstract void removeContainerRequest(T req);
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release them.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. eg. it released non-local resources
+ * @param containerId
+ */
+ public abstract void releaseAssignedContainer(ContainerId containerId);
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public abstract Resource getClusterAvailableResources();
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public abstract int getClusterNodeCount();
+
+ /**
+ * Get outstanding <code>StoredContainerRequest</code>s matching the given
+ * parameters. These StoredContainerRequests should have been added via
+ * <code>addContainerRequest</code> earlier in the lifecycle. For performance,
+ * the AMRMClient may return its internal collection directly without creating
+ * a copy. Users should not perform mutable operations on the return value.
+ * Each collection in the list contains requests with identical
+ * <code>Resource</code> size that fit in the given capability. In a
+ * collection, requests will be returned in the same order as they were added.
+ * @return Collection of request matching the parameters
+ */
+ public abstract List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability);
+
+ /**
+ * It returns the NMToken received on allocate call. It will not communicate
+ * with RM to get NMTokens. On allocate call whenever we receive new token
+ * along with container AMRMClient will cache this NMToken per node manager.
+ * This map returned should be shared with any application which is
+ * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
+ * NMToken is received for the same node manager then it will be replaced.
+ */
+ public abstract ConcurrentMap<String, Token> getNMTokens();
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class NMClient extends AbstractService {
+
+ /**
+ * Create a new instance of NMClient.
+ */
+ @Public
+ public static NMClient createNMClient() {
+ NMClient client = new NMClientImpl();
+ return client;
+ }
+
+ /**
+ * Create a new instance of NMClient.
+ */
+ @Public
+ public static NMClient createNMClient(String name) {
+ NMClient client = new NMClientImpl(name);
+ return client;
+ }
+
+ @Private
+ protected NMClient(String name) {
+ super(name);
+ }
+
+ /**
+ * <p>Start an allocated container.</p>
+ *
+ * <p>The <code>ApplicationMaster</code> or other applications that use the
+ * client must provide the details of the allocated container, including the
+ * Id, the assigned node's Id and the token via {@link Container}. In
+ * addition, the AM needs to provide the {@link ContainerLaunchContext} as
+ * well.</p>
+ *
+ * @param container the allocated container
+ * @param containerLaunchContext the context information needed by the
+ * <code>NodeManager</code> to launch the
+ * container
+ * @return a map between the auxiliary service names and their outputs
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract Map<String, ByteBuffer> startContainer(Container container,
+ ContainerLaunchContext containerLaunchContext)
+ throws YarnException, IOException;
+
+ /**
+ * <p>Stop an started container.</p>
+ *
+ * @param containerId the Id of the started container
+ * @param nodeId the Id of the <code>NodeManager</code>
+ * @param containerToken the security token to verify authenticity of the
+ * started container
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract void stopContainer(ContainerId containerId, NodeId nodeId,
+ Token containerToken) throws YarnException, IOException;
+
+ /**
+ * <p>Query the status of a container.</p>
+ *
+ * @param containerId the Id of the started container
+ * @param nodeId the Id of the <code>NodeManager</code>
+ * @param containerToken the security token to verify authenticity of the
+ * started container
+ * @return the status of a container
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
+ Token containerToken) throws YarnException, IOException;
+
+ /**
+ * <p>Set whether the containers that are started by this client, and are
+ * still running should be stopped when the client stops. By default, the
+ * feature should be enabled.</p>
+ *
+ * @param enabled whether the feature is enabled or not
+ */
+ public abstract void cleanupRunningContainersOnStop(boolean enabled);
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,295 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class YarnClient extends AbstractService {
+
+ /**
+ * Create a new instance of YarnClient.
+ */
+ @Public
+ public static YarnClient createYarnClient() {
+ YarnClient client = new YarnClientImpl();
+ return client;
+ }
+
+ /**
+ * Create a new instance of YarnClient.
+ */
+ @Public
+ public static YarnClient createYarnClient(InetSocketAddress rmAddress) {
+ YarnClient client = new YarnClientImpl(rmAddress);
+ return client;
+ }
+
+ /**
+ * Create a new instance of YarnClient.
+ */
+ @Public
+ public static YarnClient createYarnClient(String name,
+ InetSocketAddress rmAddress) {
+ YarnClient client = new YarnClientImpl(name, rmAddress);
+ return client;
+ }
+
+ @Private
+ protected YarnClient(String name) {
+ super(name);
+ }
+
+ /**
+ * <p>
+ * Obtain a new {@link ApplicationId} for submitting new applications.
+ * </p>
+ *
+ * <p>
+ * Returns a response which contains {@link ApplicationId} that can be used to
+ * submit a new application. See
+ * {@link #submitApplication(ApplicationSubmissionContext)}.
+ * </p>
+ *
+ * <p>
+ * See {@link GetNewApplicationResponse} for other information that is
+ * returned.
+ * </p>
+ *
+ * @return response containing the new <code>ApplicationId</code> to be used
+ * to submit an application
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract GetNewApplicationResponse getNewApplication() throws YarnException,
+ IOException;
+
+ /**
+ * <p>
+ * Submit a new application to <code>YARN.</code> It is a blocking call, such
+ * that it will not return {@link ApplicationId} until the submitted
+ * application has been submitted and accepted by the ResourceManager.
+ * </p>
+ *
+ * @param appContext
+ * {@link ApplicationSubmissionContext} containing all the details
+ * needed to submit a new application
+ * @return {@link ApplicationId} of the accepted application
+ * @throws YarnException
+ * @throws IOException
+ * @see #getNewApplication()
+ */
+ public abstract ApplicationId submitApplication(ApplicationSubmissionContext appContext)
+ throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Kill an application identified by given ID.
+ * </p>
+ *
+ * @param applicationId
+ * {@link ApplicationId} of the application that needs to be killed
+ * @throws YarnException
+ * in case of errors or if YARN rejects the request due to
+ * access-control restrictions.
+ * @throws IOException
+ * @see #getQueueAclsInfo()
+ */
+ public abstract void killApplication(ApplicationId applicationId) throws YarnException,
+ IOException;
+
+ /**
+ * <p>
+ * Get a report of the given Application.
+ * </p>
+ *
+ * <p>
+ * In secure mode, <code>YARN</code> verifies access to the application, queue
+ * etc. before accepting the request.
+ * </p>
+ *
+ * <p>
+ * If the user does not have <code>VIEW_APP</code> access then the following
+ * fields in the report will be set to stubbed values:
+ * <ul>
+ * <li>host - set to "N/A"</li>
+ * <li>RPC port - set to -1</li>
+ * <li>client token - set to "N/A"</li>
+ * <li>diagnostics - set to "N/A"</li>
+ * <li>tracking URL - set to "N/A"</li>
+ * <li>original tracking URL - set to "N/A"</li>
+ * <li>resource usage report - all values are -1</li>
+ * </ul>
+ * </p>
+ *
+ * @param appId
+ * {@link ApplicationId} of the application that needs a report
+ * @return application report
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract ApplicationReport getApplicationReport(ApplicationId appId)
+ throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Get a report (ApplicationReport) of all Applications in the cluster.
+ * </p>
+ *
+ * <p>
+ * If the user does not have <code>VIEW_APP</code> access for an application
+ * then the corresponding report will be filtered as described in
+ * {@link #getApplicationReport(ApplicationId)}.
+ * </p>
+ *
+ * @return a list of reports of all running applications
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract List<ApplicationReport> getApplicationList() throws YarnException,
+ IOException;
+
+ /**
+ * <p>
+ * Get metrics ({@link YarnClusterMetrics}) about the cluster.
+ * </p>
+ *
+ * @return cluster metrics
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
+ IOException;
+
+ /**
+ * <p>
+ * Get a report of all nodes ({@link NodeReport}) in the cluster.
+ * </p>
+ *
+ * @return A list of report of all nodes
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract List<NodeReport> getNodeReports() throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Get a delegation token so as to be able to talk to YARN using those tokens.
+ *
+ * @param renewer
+ * Address of the renewer who can renew these tokens when needed by
+ * securely talking to YARN.
+ * @return a delegation token ({@link Token}) that can be used to
+ * talk to YARN
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract Token getRMDelegationToken(Text renewer)
+ throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Get information ({@link QueueInfo}) about a given <em>queue</em>.
+ * </p>
+ *
+ * @param queueName
+ * Name of the queue whose information is needed
+ * @return queue information
+ * @throws YarnException
+ * in case of errors or if YARN rejects the request due to
+ * access-control restrictions.
+ * @throws IOException
+ */
+ public abstract QueueInfo getQueueInfo(String queueName) throws YarnException,
+ IOException;
+
+ /**
+ * <p>
+ * Get information ({@link QueueInfo}) about all queues, recursively if there
+ * is a hierarchy
+ * </p>
+ *
+ * @return a list of queue-information for all queues
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract List<QueueInfo> getAllQueues() throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Get information ({@link QueueInfo}) about top level queues.
+ * </p>
+ *
+ * @return a list of queue-information for all the top-level queues
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract List<QueueInfo> getRootQueueInfos() throws YarnException, IOException;
+
+ /**
+ * <p>
+ * Get information ({@link QueueInfo}) about all the immediate children queues
+ * of the given queue
+ * </p>
+ *
+ * @param parent
+ * Name of the queue whose child-queues' information is needed
+ * @return a list of queue-information for all queues who are direct children
+ * of the given parent queue.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract List<QueueInfo> getChildQueueInfos(String parent) throws YarnException,
+ IOException;
+
+ /**
+ * <p>
+ * Get information about <em>acls</em> for <em>current user</em> on all the
+ * existing queues.
+ * </p>
+ *
+ * @return a list of queue acls ({@link QueueUserACLInfo}) for
+ * <em>current user</em>
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
+ IOException;
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,245 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>AMRMClientAsync</code> handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions. It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ *
+ * It should be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ * public void onContainersAllocated(List<Container> containers) {
+ * [run tasks on the containers]
+ * }
+ *
+ * public void onContainersCompleted(List<ContainerStatus> statuses) {
+ * [update progress, check whether app is done]
+ * }
+ *
+ * public void onNodesUpdated(List<NodeReport> updated) {}
+ *
+ * public void onReboot() {}
+ * }
+ * }
+ * </pre>
+ *
+ * The client's lifecycle should be managed similarly to the following:
+ *
+ * <pre>
+ * {@code
+ * AMRMClientAsync asyncClient =
+ * createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ * .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ * appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Public
+@Stable
+public abstract class AMRMClientAsync<T extends ContainerRequest>
+extends AbstractService {
+
+ protected final AMRMClient<T> client;
+ protected final CallbackHandler handler;
+ protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
+
+ public static <T extends ContainerRequest> AMRMClientAsync<T>
+ createAMRMClientAsync(
+ ApplicationAttemptId id,
+ int intervalMs,
+ CallbackHandler callbackHandler) {
+ return new AMRMClientAsyncImpl<T>(id, intervalMs, callbackHandler);
+ }
+
+ public static <T extends ContainerRequest> AMRMClientAsync<T>
+ createAMRMClientAsync(
+ AMRMClient<T> client,
+ int intervalMs,
+ CallbackHandler callbackHandler) {
+ return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
+ }
+
+ protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+ CallbackHandler callbackHandler) {
+ this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
+ CallbackHandler callbackHandler) {
+ super(AMRMClientAsync.class.getName());
+ this.client = client;
+ this.heartbeatIntervalMs.set(intervalMs);
+ this.handler = callbackHandler;
+ }
+
+ public void setHeartbeatInterval(int interval) {
+ heartbeatIntervalMs.set(interval);
+ }
+
+ public abstract List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability);
+
+ /**
+ * Registers this application master with the resource manager. On successful
+ * registration, starts the heartbeating thread.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnException, IOException;
+
+ /**
+ * Unregister the application master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnException
+ * @throws IOException
+ */
+ public abstract void unregisterApplicationMaster(
+ FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl)
+ throws YarnException, IOException;
+
+ /**
+ * Request containers for resources before calling <code>allocate</code>
+ * @param req Resource request
+ */
+ public abstract void addContainerRequest(T req);
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public abstract void removeContainerRequest(T req);
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release them.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. eg. it released non-local resources
+ * @param containerId
+ */
+ public abstract void releaseAssignedContainer(ContainerId containerId);
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public abstract Resource getClusterAvailableResources();
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public abstract int getClusterNodeCount();
+
+ /**
+ * It returns the NMToken received on allocate call. It will not communicate
+ * with RM to get NMTokens. On allocate call whenever we receive new token
+ * along with new container AMRMClientAsync will cache this NMToken per node
+ * manager. This map returned should be shared with any application which is
+ * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+ * NMTokens. If a new NMToken is received for the same node manager
+ * then it will be replaced.
+ */
+ public abstract ConcurrentMap<String, Token> getNMTokens();
+
+ public interface CallbackHandler {
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with completed
+ * containers. If the response contains both completed containers and
+ * allocated containers, this will be called before containersAllocated.
+ */
+ public void onContainersCompleted(List<ContainerStatus> statuses);
+
+ /**
+ * Called when the ResourceManager responds to a heartbeat with allocated
+ * containers. If the response containers both completed containers and
+ * allocated containers, this will be called after containersCompleted.
+ */
+ public void onContainersAllocated(List<Container> containers);
+
+ /**
+ * Called when the ResourceManager wants the ApplicationMaster to shutdown
+ * for being out of sync etc. The ApplicationMaster should not unregister
+ * with the RM unless the ApplicationMaster wants to be the last attempt.
+ */
+ public void onShutdownRequest();
+
+ /**
+ * Called when nodes tracked by the ResourceManager have changed in health,
+ * availability etc.
+ */
+ public void onNodesUpdated(List<NodeReport> updatedNodes);
+
+ public float getProgress();
+
+ public void onError(Exception e);
+ }
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>NMClientAsync</code> handles communication with all the NodeManagers
+ * and provides asynchronous updates on getting responses from them. It
+ * maintains a thread pool to communicate with individual NMs where a number of
+ * worker threads process requests to NMs by using {@link NMClientImpl}. The max
+ * size of the thread pool is configurable through
+ * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
+ *
+ * It should be used in conjunction with a CallbackHandler. For example
+ *
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ * public void onContainerStarted(ContainerId containerId,
+ * Map<String, ByteBuffer> allServiceResponse) {
+ * [post process after the container is started, process the response]
+ * }
+ *
+ * public void onContainerStatusReceived(ContainerId containerId,
+ * ContainerStatus containerStatus) {
+ * [make use of the status of the container]
+ * }
+ *
+ * public void onContainerStopped(ContainerId containerId) {
+ * [post process after the container is stopped]
+ * }
+ *
+ * public void onStartContainerError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ *
+ * public void onGetContainerStatusError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ *
+ * public void onStopContainerError(
+ * ContainerId containerId, Throwable t) {
+ * [handle the raised exception]
+ * }
+ * }
+ * }
+ * </pre>
+ *
+ * The client's life-cycle should be managed like the following:
+ *
+ * <pre>
+ * {@code
+ * NMClientAsync asyncClient =
+ * NMClientAsync.createNMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ * container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ * container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Public
+@Stable
+public abstract class NMClientAsync extends AbstractService {
+
+ protected NMClient client;
+ protected CallbackHandler callbackHandler;
+
+ public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
+ return new NMClientAsyncImpl(callbackHandler);
+ }
+
+ protected NMClientAsync(CallbackHandler callbackHandler) {
+ this (NMClientAsync.class.getName(), callbackHandler);
+ }
+
+ protected NMClientAsync(String name, CallbackHandler callbackHandler) {
+ this (name, new NMClientImpl(), callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ protected NMClientAsync(String name, NMClient client,
+ CallbackHandler callbackHandler) {
+ super(name);
+ this.setClient(client);
+ this.setCallbackHandler(callbackHandler);
+ }
+
+ public abstract void startContainerAsync(
+ Container container, ContainerLaunchContext containerLaunchContext);
+
+ public abstract void stopContainerAsync(
+ ContainerId containerId, NodeId nodeId, Token containerToken);
+
+ public abstract void getContainerStatusAsync(
+ ContainerId containerId, NodeId nodeId, Token containerToken);
+
+ public NMClient getClient() {
+ return client;
+ }
+
+ public void setClient(NMClient client) {
+ this.client = client;
+ }
+
+ public CallbackHandler getCallbackHandler() {
+ return callbackHandler;
+ }
+
+ public void setCallbackHandler(CallbackHandler callbackHandler) {
+ this.callbackHandler = callbackHandler;
+ }
+
+ /**
+ * <p>
+ * The callback interface needs to be implemented by {@link NMClientAsync}
+ * users. The APIs are called when responses from <code>NodeManager</code> are
+ * available.
+ * </p>
+ *
+ * <p>
+ * Once a callback happens, the users can chose to act on it in blocking or
+ * non-blocking manner. If the action on callback is done in a blocking
+ * manner, some of the threads performing requests on NodeManagers may get
+ * blocked depending on how many threads in the pool are busy.
+ * </p>
+ *
+ * <p>
+ * The implementation of the callback function should not throw the
+ * unexpected exception. Otherwise, {@link NMClientAsync} will just
+ * catch, log and then ignore it.
+ * </p>
+ */
+ public static interface CallbackHandler {
+ /**
+ * The API is called when <code>NodeManager</code> responds to indicate its
+ * acceptance of the starting container request
+ * @param containerId the Id of the container
+ * @param allServiceResponse a Map between the auxiliary service names and
+ * their outputs
+ */
+ void onContainerStarted(ContainerId containerId,
+ Map<String, ByteBuffer> allServiceResponse);
+
+ /**
+ * The API is called when <code>NodeManager</code> responds with the status
+ * of the container
+ * @param containerId the Id of the container
+ * @param containerStatus the status of the container
+ */
+ void onContainerStatusReceived(ContainerId containerId,
+ ContainerStatus containerStatus);
+
+ /**
+ * The API is called when <code>NodeManager</code> responds to indicate the
+ * container is stopped.
+ * @param containerId the Id of the container
+ */
+ void onContainerStopped(ContainerId containerId);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * starting a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onStartContainerError(ContainerId containerId, Throwable t);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * querying the status of a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onGetContainerStatusError(ContainerId containerId, Throwable t);
+
+ /**
+ * The API is called when an exception is raised in the process of
+ * stopping a container
+ *
+ * @param containerId the Id of the container
+ * @param t the raised exception
+ */
+ void onStopContainerError(ContainerId containerId, Throwable t);
+
+ }
+
+}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java?rev=1494017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java Tue Jun 18 04:02:47 2013
@@ -0,0 +1,342 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.async.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
+public class AMRMClientAsyncImpl<T extends ContainerRequest>
+extends AMRMClientAsync<T> {
+
+ private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class);
+
+ private final HeartbeatThread heartbeatThread;
+ private final CallbackHandlerThread handlerThread;
+
+ private final BlockingQueue<AllocateResponse> responseQueue;
+
+ private final Object unregisterHeartbeatLock = new Object();
+
+ private volatile boolean keepRunning;
+ private volatile float progress;
+
+ private volatile Exception savedException;
+
+ public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs,
+ CallbackHandler callbackHandler) {
+ this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
+ }
+
+ @Private
+ @VisibleForTesting
+ public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
+ CallbackHandler callbackHandler) {
+ super(client, intervalMs, callbackHandler);
+ heartbeatThread = new HeartbeatThread();
+ handlerThread = new CallbackHandlerThread();
+ responseQueue = new LinkedBlockingQueue<AllocateResponse>();
+ keepRunning = true;
+ savedException = null;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ client.init(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ handlerThread.start();
+ client.start();
+ super.serviceStart();
+ }
+
+ /**
+ * Tells the heartbeat and handler threads to stop and waits for them to
+ * terminate. Calling this method from the callback handler thread would cause
+ * deadlock, and thus should be avoided.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ if (Thread.currentThread() == handlerThread) {
+ throw new YarnRuntimeException("Cannot call stop from callback handler thread!");
+ }
+ keepRunning = false;
+ try {
+ heartbeatThread.join();
+ } catch (InterruptedException ex) {
+ LOG.error("Error joining with heartbeat thread", ex);
+ }
+ client.stop();
+ try {
+ handlerThread.interrupt();
+ handlerThread.join();
+ } catch (InterruptedException ex) {
+ LOG.error("Error joining with hander thread", ex);
+ }
+ super.serviceStop();
+ }
+
+ public void setHeartbeatInterval(int interval) {
+ heartbeatIntervalMs.set(interval);
+ }
+
+ public List<? extends Collection<T>> getMatchingRequests(
+ Priority priority,
+ String resourceName,
+ Resource capability) {
+ return client.getMatchingRequests(priority, resourceName, capability);
+ }
+
+ /**
+ * Registers this application master with the resource manager. On successful
+ * registration, starts the heartbeating thread.
+ * @throws YarnException
+ * @throws IOException
+ */
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ String appHostName, int appHostPort, String appTrackingUrl)
+ throws YarnException, IOException {
+ RegisterApplicationMasterResponse response = client
+ .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
+ heartbeatThread.start();
+ return response;
+ }
+
+ /**
+ * Unregister the application master. This must be called in the end.
+ * @param appStatus Success/Failure status of the master
+ * @param appMessage Diagnostics message on failure
+ * @param appTrackingUrl New URL to get master info
+ * @throws YarnException
+ * @throws IOException
+ */
+ public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
+ String appMessage, String appTrackingUrl) throws YarnException,
+ IOException {
+ synchronized (unregisterHeartbeatLock) {
+ keepRunning = false;
+ client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
+ }
+ }
+
+ /**
+ * Request containers for resources before calling <code>allocate</code>
+ * @param req Resource request
+ */
+ public void addContainerRequest(T req) {
+ client.addContainerRequest(req);
+ }
+
+ /**
+ * Remove previous container request. The previous container request may have
+ * already been sent to the ResourceManager. So even after the remove request
+ * the app must be prepared to receive an allocation for the previous request
+ * even after the remove request
+ * @param req Resource request
+ */
+ public void removeContainerRequest(T req) {
+ client.removeContainerRequest(req);
+ }
+
+ /**
+ * Release containers assigned by the Resource Manager. If the app cannot use
+ * the container or wants to give up the container then it can release them.
+ * The app needs to make new requests for the released resource capability if
+ * it still needs it. eg. it released non-local resources
+ * @param containerId
+ */
+ public void releaseAssignedContainer(ContainerId containerId) {
+ client.releaseAssignedContainer(containerId);
+ }
+
+ /**
+ * Get the currently available resources in the cluster.
+ * A valid value is available after a call to allocate has been made
+ * @return Currently available resources
+ */
+ public Resource getClusterAvailableResources() {
+ return client.getClusterAvailableResources();
+ }
+
+ /**
+ * Get the current number of nodes in the cluster.
+ * A valid values is available after a call to allocate has been made
+ * @return Current number of nodes in the cluster
+ */
+ public int getClusterNodeCount() {
+ return client.getClusterNodeCount();
+ }
+
+ /**
+ * It returns the NMToken received on allocate call. It will not communicate
+ * with RM to get NMTokens. On allocate call whenever we receive new token
+ * along with new container AMRMClientAsync will cache this NMToken per node
+ * manager. This map returned should be shared with any application which is
+ * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+ * NMTokens. If a new NMToken is received for the same node manager
+ * then it will be replaced.
+ */
+ public ConcurrentMap<String, Token> getNMTokens() {
+ return client.getNMTokens();
+ }
+
+ private class HeartbeatThread extends Thread {
+ public HeartbeatThread() {
+ super("AMRM Heartbeater thread");
+ }
+
+ public void run() {
+ while (true) {
+ AllocateResponse response = null;
+ // synchronization ensures we don't send heartbeats after unregistering
+ synchronized (unregisterHeartbeatLock) {
+ if (!keepRunning) {
+ break;
+ }
+
+ try {
+ response = client.allocate(progress);
+ } catch (YarnException ex) {
+ LOG.error("Yarn exception on heartbeat", ex);
+ savedException = ex;
+ // interrupt handler thread in case it waiting on the queue
+ handlerThread.interrupt();
+ break;
+ } catch (IOException e) {
+ LOG.error("IO exception on heartbeat", e);
+ savedException = e;
+ // interrupt handler thread in case it waiting on the queue
+ handlerThread.interrupt();
+ break;
+ }
+ }
+ if (response != null) {
+ while (true) {
+ try {
+ responseQueue.put(response);
+ break;
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting to put on response queue", ex);
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(heartbeatIntervalMs.get());
+ } catch (InterruptedException ex) {
+ LOG.info("Heartbeater interrupted", ex);
+ }
+ }
+ }
+ }
+
+ private class CallbackHandlerThread extends Thread {
+ public CallbackHandlerThread() {
+ super("AMRM Callback Handler Thread");
+ }
+
+ public void run() {
+ while (keepRunning) {
+ AllocateResponse response;
+ try {
+ if(savedException != null) {
+ LOG.error("Stopping callback due to: ", savedException);
+ handler.onError(savedException);
+ break;
+ }
+ response = responseQueue.take();
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting for queue", ex);
+ continue;
+ }
+
+ if (response.getAMCommand() != null) {
+ boolean stop = false;
+ switch(response.getAMCommand()) {
+ case AM_RESYNC:
+ case AM_SHUTDOWN:
+ handler.onShutdownRequest();
+ LOG.info("Shutdown requested. Stopping callback.");
+ stop = true;
+ break;
+ default:
+ String msg =
+ "Unhandled value of AMCommand: " + response.getAMCommand();
+ LOG.error(msg);
+ throw new YarnRuntimeException(msg);
+ }
+ if(stop) {
+ // should probably stop heartbeating also YARN-763
+ break;
+ }
+ }
+ List<NodeReport> updatedNodes = response.getUpdatedNodes();
+ if (!updatedNodes.isEmpty()) {
+ handler.onNodesUpdated(updatedNodes);
+ }
+
+ List<ContainerStatus> completed =
+ response.getCompletedContainersStatuses();
+ if (!completed.isEmpty()) {
+ handler.onContainersCompleted(completed);
+ }
+
+ List<Container> allocated = response.getAllocatedContainers();
+ if (!allocated.isEmpty()) {
+ handler.onContainersAllocated(allocated);
+ }
+
+ progress = handler.getProgress();
+ }
+ }
+ }
+}