You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/05/29 03:41:42 UTC

svn commit: r1487184 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/ hadoop-yarn/hadoop-yarn-client/src...

Author: vinodkv
Date: Wed May 29 01:41:41 2013
New Revision: 1487184

URL: http://svn.apache.org/r1487184
Log:
YARN-422. Add a NM Client library to help application-writers. Contributed by Zhijie Shen.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1487184&r1=1487183&r2=1487184&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed May 29 01:41:41 2013
@@ -87,6 +87,9 @@ Release 2.0.5-beta - UNRELEASED
     Azure environments. (See breakdown of tasks below for subtasks and
     contributors)
 
+    YARN-422. Add a NM Client library to help application-writers. (Zhijie Shen
+    via vinodkv)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1487184&r1=1487183&r2=1487184&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed May 29 01:41:41 2013
@@ -708,6 +708,14 @@ public class YarnConfiguration extends C
   public static final long DEFAULT_YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS =
       1000;
 
+  /**
+   * Max number of threads in NMClientAsync to process container management
+   * events
+   */
+  public static final String NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE =
+      YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
+  public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
+
   public YarnConfiguration() {
     super();
   }

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java?rev=1487184&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java Wed May 29 01:41:41 2013
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.Service;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface NMClient extends Service {
+
+  /**
+   * <p>Start an allocated container.</p>
+   *
+   * <p>The <code>ApplicationMaster</code> or other applications that use the
+   * client must provide the details of the allocated container, including the
+   * Id, the assigned node's Id and the token via {@link Container}. In
+   * addition, the AM needs to provide the {@link ContainerLaunchContext} as
+   * well.</p>
+   *
+   * @param container the allocated container
+   * @param containerLaunchContext the context information needed by the
+   *                               <code>NodeManager</code> to launch the
+   *                               container
+   * @return a map between the auxiliary service names and their outputs
+   * @throws YarnRemoteException
+   * @throws IOException
+   */
+  Map<String, ByteBuffer> startContainer(Container container,
+      ContainerLaunchContext containerLaunchContext)
+          throws YarnRemoteException, IOException;
+
+  /**
+   * <p>Stop an started container.</p>
+   *
+   * @param containerId the Id of the started container
+   * @param nodeId the Id of the <code>NodeManager</code>
+   * @param containerToken the security token to verify authenticity of the
+   *                       started container
+   * @throws YarnRemoteException
+   * @throws IOException
+   */
+  void stopContainer(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) throws YarnRemoteException, IOException;
+
+  /**
+   * <p>Query the status of a container.</p>
+   *
+   * @param containerId the Id of the started container
+   * @param nodeId the Id of the <code>NodeManager</code>
+   * @param containerToken the security token to verify authenticity of the
+   *                       started container
+   * @return the status of a container
+   * @throws YarnRemoteException
+   * @throws IOException
+   */
+  ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) throws YarnRemoteException, IOException;
+
+  /**
+   * <p>Set whether the containers that are started by this client, and are
+   * still running should be stopped when the client stops. By default, the
+   * feature should be enabled.</p>
+   *
+   * @param enabled whether the feature is enabled or not
+   */
+  void cleanupRunningContainersOnStop(boolean enabled);
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java?rev=1487184&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java Wed May 29 01:41:41 2013
@@ -0,0 +1,709 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * <code>NMClientAsync</code> handles communication with all the NodeManagers
+ * and provides asynchronous updates on getting responses from them. It
+ * maintains a thread pool to communicate with individual NMs where a number of
+ * worker threads process requests to NMs by using {@link NMClientImpl}. The max
+ * size of the thread pool is configurable through
+ * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
+ *
+ * It should be used in conjunction with a CallbackHandler. For example
+ *
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ *   public void onContainerStarted(ContainerId containerId,
+ *       Map<String, ByteBuffer> allServiceResponse) {
+ *     [post process after the container is started, process the response]
+ *   }
+ *
+ *   public void onContainerStatusReceived(ContainerId containerId,
+ *       ContainerStatus containerStatus) {
+ *     [make use of the status of the container]
+ *   }
+ *
+ *   public void onContainerStopped(ContainerId containerId) {
+ *     [post process after the container is stopped]
+ *   }
+ *
+ *   public void onStartContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onGetContainerStatusError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onStopContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ * }
+ * }
+ * </pre>
+ *
+ * The client's life-cycle should be managed like the following:
+ *
+ * <pre>
+ * {@code
+ * NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Unstable
+@Evolving
+public class NMClientAsync extends AbstractService {
+
+  private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
+
+  protected static final int INITIAL_THREAD_POOL_SIZE = 10;
+
+  protected ThreadPoolExecutor threadPool;
+  protected int maxThreadPoolSize;
+  protected Thread eventDispatcherThread;
+  protected AtomicBoolean stopped = new AtomicBoolean(false);
+  protected BlockingQueue<ContainerEvent> events =
+      new LinkedBlockingQueue<ContainerEvent>();
+
+  protected NMClient client;
+  protected CallbackHandler callbackHandler;
+
+  protected ConcurrentMap<ContainerId, StatefulContainer> containers =
+      new ConcurrentHashMap<ContainerId, StatefulContainer>();
+
+  public NMClientAsync(CallbackHandler callbackHandler) {
+    this (NMClientAsync.class.getName(), callbackHandler);
+  }
+
+  public NMClientAsync(String name, CallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
+  }
+
+  @Private
+  @VisibleForTesting
+  protected NMClientAsync(String name, NMClient client,
+      CallbackHandler callbackHandler) {
+    super(name);
+    this.client = client;
+    this.callbackHandler = callbackHandler;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.maxThreadPoolSize = conf.getInt(
+        YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE,
+        YarnConfiguration.DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE);
+    LOG.info("Upper bound of the thread pool size is " + maxThreadPoolSize);
+
+    client.init(conf);
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    client.start();
+
+    ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+        this.getClass().getName() + " #%d").setDaemon(true).build();
+
+    // Start with a default core-pool size and change it dynamically.
+    int initSize = Math.min(INITIAL_THREAD_POOL_SIZE, maxThreadPoolSize);
+    threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1,
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+
+    eventDispatcherThread = new Thread() {
+      @Override
+      public void run() {
+        ContainerEvent event = null;
+        Set<String> allNodes = new HashSet<String>();
+
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            event = events.take();
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.error("Returning, thread interrupted", e);
+            }
+            return;
+          }
+
+          allNodes.add(event.getNodeId().toString());
+
+          int threadPoolSize = threadPool.getCorePoolSize();
+
+          // We can increase the pool size only if haven't reached the maximum
+          // limit yet.
+          if (threadPoolSize != maxThreadPoolSize) {
+
+            // nodes where containers will run at *this* point of time. This is
+            // *not* the cluster size and doesn't need to be.
+            int nodeNum = allNodes.size();
+            int idealThreadPoolSize = Math.min(maxThreadPoolSize, nodeNum);
+
+            if (threadPoolSize < idealThreadPoolSize) {
+              // Bump up the pool size to idealThreadPoolSize +
+              // INITIAL_POOL_SIZE, the later is just a buffer so we are not
+              // always increasing the pool-size
+              int newThreadPoolSize = Math.min(maxThreadPoolSize,
+                  idealThreadPoolSize + INITIAL_THREAD_POOL_SIZE);
+              LOG.info("Set NMClientAsync thread pool size to " +
+                  newThreadPoolSize + " as the number of nodes to talk to is "
+                  + nodeNum);
+              threadPool.setCorePoolSize(newThreadPoolSize);
+            }
+          }
+
+          // the events from the queue are handled in parallel with a thread
+          // pool
+          threadPool.execute(getContainerEventProcessor(event));
+
+          // TODO: Group launching of multiple containers to a single
+          // NodeManager into a single connection
+        }
+      }
+    };
+    eventDispatcherThread.setName("Container  Event Dispatcher");
+    eventDispatcherThread.setDaemon(false);
+    eventDispatcherThread.start();
+
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
+    eventDispatcherThread.interrupt();
+    try {
+      eventDispatcherThread.join();
+    } catch (InterruptedException e) {
+      LOG.error("The thread of " + eventDispatcherThread.getName() +
+          " didn't finish normally.", e);
+    }
+    threadPool.shutdownNow();
+    // If NMClientImpl doesn't stop running containers, the states doesn't
+    // need to be cleared.
+    if (!(client instanceof NMClientImpl) ||
+        ((NMClientImpl) client).cleanupRunningContainers.get()) {
+      containers.clear();
+    }
+    client.stop();
+    super.stop();
+  }
+
+  public void startContainer(
+      Container container, ContainerLaunchContext containerLaunchContext) {
+    if (containers.putIfAbsent(container.getId(),
+        new StatefulContainer(this, container.getId())) != null) {
+      callbackHandler.onStartContainerError(container.getId(),
+          RPCUtil.getRemoteException("Container " + container.getId() +
+              " is already started or scheduled to start"));
+    }
+    try {
+      events.put(new StartContainerEvent(container, containerLaunchContext));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of starting Container " +
+          container.getId());
+      callbackHandler.onStartContainerError(container.getId(), e);
+    }
+  }
+
+  public void stopContainer(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) {
+    if (containers.get(containerId) == null) {
+      callbackHandler.onStopContainerError(containerId,
+          RPCUtil.getRemoteException("Container " + containerId +
+              " is neither started nor scheduled to start"));
+    }
+    try {
+      events.put(new ContainerEvent(containerId, nodeId, containerToken,
+          ContainerEventType.STOP_CONTAINER));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of stopping Container " +
+          containerId);
+      callbackHandler.onStopContainerError(containerId, e);
+    }
+  }
+
+  public void getContainerStatus(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) {
+    try {
+      events.put(new ContainerEvent(containerId, nodeId, containerToken,
+          ContainerEventType.QUERY_CONTAINER));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of querying the status" +
+          " of Container " + containerId);
+      callbackHandler.onGetContainerStatusError(containerId, e);
+    }
+  }
+
+  protected static enum ContainerState {
+    PREP, FAILED, RUNNING, DONE,
+  }
+
+  protected boolean isCompletelyDone(StatefulContainer container) {
+    return container.getState() == ContainerState.DONE ||
+        container.getState() == ContainerState.FAILED;
+  }
+
+  protected ContainerEventProcessor getContainerEventProcessor(
+      ContainerEvent event) {
+    return new ContainerEventProcessor(event);
+  }
+
+  /**
+   * The type of the event of interacting with a container
+   */
+  protected static enum ContainerEventType {
+    START_CONTAINER,
+    STOP_CONTAINER,
+    QUERY_CONTAINER
+  }
+
+  protected static class ContainerEvent
+      extends AbstractEvent<ContainerEventType>{
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private ContainerToken containerToken;
+
+    public ContainerEvent(ContainerId containerId, NodeId nodeId,
+        ContainerToken containerToken, ContainerEventType type) {
+      super(type);
+      this.containerId = containerId;
+      this.nodeId = nodeId;
+      this.containerToken = containerToken;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public NodeId getNodeId() {
+      return nodeId;
+    }
+
+    public ContainerToken getContainerToken() {
+      return containerToken;
+    }
+  }
+
+  protected static class StartContainerEvent extends ContainerEvent {
+    private Container container;
+    private ContainerLaunchContext containerLaunchContext;
+
+    public StartContainerEvent(Container container,
+        ContainerLaunchContext containerLaunchContext) {
+      super(container.getId(), container.getNodeId(),
+          container.getContainerToken(), ContainerEventType.START_CONTAINER);
+      this.container = container;
+      this.containerLaunchContext = containerLaunchContext;
+    }
+
+    public Container getContainer() {
+      return container;
+    }
+
+    public ContainerLaunchContext getContainerLaunchContext() {
+      return containerLaunchContext;
+    }
+  }
+
+  protected static class StatefulContainer implements
+      EventHandler<ContainerEvent> {
+
+    protected final static StateMachineFactory<StatefulContainer,
+        ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory
+            = new StateMachineFactory<StatefulContainer, ContainerState,
+                ContainerEventType, ContainerEvent>(ContainerState.PREP)
+
+            // Transitions from PREP state
+            .addTransition(ContainerState.PREP,
+                EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+                ContainerEventType.START_CONTAINER,
+                new StartContainerTransition())
+            .addTransition(ContainerState.PREP, ContainerState.DONE,
+                ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
+
+            // Transitions from RUNNING state
+            // RUNNING -> RUNNING should be the invalid transition
+            .addTransition(ContainerState.RUNNING,
+                EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
+                ContainerEventType.STOP_CONTAINER,
+                new StopContainerTransition())
+
+            // Transition from DONE state
+            .addTransition(ContainerState.DONE, ContainerState.DONE,
+                EnumSet.of(ContainerEventType.START_CONTAINER,
+                    ContainerEventType.STOP_CONTAINER))
+
+            // Transition from FAILED state
+            .addTransition(ContainerState.FAILED, ContainerState.FAILED,
+                EnumSet.of(ContainerEventType.START_CONTAINER,
+                    ContainerEventType.STOP_CONTAINER));
+
+    protected static class StartContainerTransition implements
+        MultipleArcTransition<StatefulContainer, ContainerEvent,
+        ContainerState> {
+
+      @Override
+      public ContainerState transition(
+          StatefulContainer container, ContainerEvent event) {
+        ContainerId containerId = event.getContainerId();
+        try {
+          StartContainerEvent scEvent = null;
+          if (event instanceof StartContainerEvent) {
+            scEvent = (StartContainerEvent) event;
+          }
+          assert scEvent != null;
+          Map<String, ByteBuffer> allServiceResponse =
+              container.nmClientAsync.client.startContainer(
+                  scEvent.getContainer(), scEvent.getContainerLaunchContext());
+          try {
+            container.nmClientAsync.callbackHandler.onContainerStarted(
+                containerId, allServiceResponse);
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info("Unchecked exception is thrown from onContainerStarted for "
+                + "Container " + containerId, thr);
+          }
+          return ContainerState.RUNNING;
+        } catch (YarnRemoteException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (IOException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (Throwable t) {
+          return onExceptionRaised(container, event, t);
+        }
+      }
+
+      private ContainerState onExceptionRaised(StatefulContainer container,
+          ContainerEvent event, Throwable t) {
+        try {
+          container.nmClientAsync.callbackHandler.onStartContainerError(
+              event.getContainerId(), t);
+        } catch (Throwable thr) {
+          // Don't process user created unchecked exception
+          LOG.info(
+              "Unchecked exception is thrown from onStartContainerError for " +
+                  "Container " + event.getContainerId(), thr);
+        }
+        return ContainerState.FAILED;
+      }
+    }
+
+    protected static class StopContainerTransition implements
+        MultipleArcTransition<StatefulContainer, ContainerEvent,
+        ContainerState> {
+
+      @Override
+      public ContainerState transition(
+          StatefulContainer container, ContainerEvent event) {
+        ContainerId containerId = event.getContainerId();
+        try {
+          container.nmClientAsync.client.stopContainer(
+              containerId, event.getNodeId(), event.getContainerToken());
+          try {
+            container.nmClientAsync.callbackHandler.onContainerStopped(
+                event.getContainerId());
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info("Unchecked exception is thrown from onContainerStopped for "
+                + "Container " + event.getContainerId(), thr);
+          }
+          return ContainerState.DONE;
+        } catch (YarnRemoteException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (IOException e) {
+          return onExceptionRaised(container, event, e);
+        } catch (Throwable t) {
+          return onExceptionRaised(container, event, t);
+        }
+      }
+
+      private ContainerState onExceptionRaised(StatefulContainer container,
+          ContainerEvent event, Throwable t) {
+        try {
+          container.nmClientAsync.callbackHandler.onStopContainerError(
+              event.getContainerId(), t);
+        } catch (Throwable thr) {
+          // Don't process user created unchecked exception
+          LOG.info("Unchecked exception is thrown from onStopContainerError for "
+              + "Container " + event.getContainerId(), thr);
+        }
+        return ContainerState.FAILED;
+      }
+    }
+
+    protected static class OutOfOrderTransition implements
+        SingleArcTransition<StatefulContainer, ContainerEvent> {
+
+      protected static final String STOP_BEFORE_START_ERROR_MSG =
+          "Container was killed before it was launched";
+
+      @Override
+      public void transition(StatefulContainer container, ContainerEvent event) {
+        try {
+          container.nmClientAsync.callbackHandler.onStartContainerError(
+              event.getContainerId(),
+              RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
+        } catch (Throwable thr) {
+          // Don't process user created unchecked exception
+          LOG.info(
+              "Unchecked exception is thrown from onStartContainerError for " +
+                  "Container " + event.getContainerId(), thr);
+        }
+      }
+    }
+
+    private final NMClientAsync nmClientAsync;
+    private final ContainerId containerId;
+    private final StateMachine<ContainerState, ContainerEventType,
+        ContainerEvent> stateMachine;
+    private final ReadLock readLock;
+    private final WriteLock writeLock;
+
+    public StatefulContainer(NMClientAsync client, ContainerId containerId) {
+      this.nmClientAsync = client;
+      this.containerId = containerId;
+      stateMachine = stateMachineFactory.make(this);
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      readLock = lock.readLock();
+      writeLock = lock.writeLock();
+    }
+
+    @Override
+    public void handle(ContainerEvent event) {
+      writeLock.lock();
+      try {
+        try {
+          this.stateMachine.doTransition(event.getType(), event);
+        } catch (InvalidStateTransitonException e) {
+          LOG.error("Can't handle this event at current state", e);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public ContainerState getState() {
+      readLock.lock();
+      try {
+        return stateMachine.getCurrentState();
+      } finally {
+        readLock.unlock();
+      }
+    }
+  }
+
+  protected class ContainerEventProcessor implements Runnable {
+    protected ContainerEvent event;
+
+    public ContainerEventProcessor(ContainerEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      ContainerId containerId = event.getContainerId();
+      LOG.info("Processing Event " + event + " for Container " + containerId);
+      if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
+        try {
+          ContainerStatus containerStatus = client.getContainerStatus(
+              containerId, event.getNodeId(), event.getContainerToken());
+          try {
+            callbackHandler.onContainerStatusReceived(
+                containerId, containerStatus);
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info(
+                "Unchecked exception is thrown from onContainerStatusReceived" +
+                    " for Container " + event.getContainerId(), thr);
+          }
+        } catch (YarnRemoteException e) {
+          onExceptionRaised(containerId, e);
+        } catch (IOException e) {
+          onExceptionRaised(containerId, e);
+        } catch (Throwable t) {
+          onExceptionRaised(containerId, t);
+        }
+      } else {
+        StatefulContainer container = containers.get(containerId);
+        if (container == null) {
+          LOG.info("Container " + containerId + " is already stopped or failed");
+        } else {
+          container.handle(event);
+        }
+      }
+    }
+
+    private void onExceptionRaised(ContainerId containerId, Throwable t) {
+      try {
+        callbackHandler.onGetContainerStatusError(containerId, t);
+      } catch (Throwable thr) {
+        // Don't process user created unchecked exception
+        LOG.info("Unchecked exception is thrown from onGetContainerStatusError" +
+            " for Container " + containerId, thr);
+      }
+    }
+  }
+
+  /**
+   * <p>
+   * The callback interface needs to be implemented by {@link NMClientAsync}
+   * users. The APIs are called when responses from <code>NodeManager</code> are
+   * available.
+   * </p>
+   *
+   * <p>
+   * Once a callback happens, the users can chose to act on it in blocking or
+   * non-blocking manner. If the action on callback is done in a blocking
+   * manner, some of the threads performing requests on NodeManagers may get
+   * blocked depending on how many threads in the pool are busy.
+   * </p>
+   *
+   * <p>
+   * The implementation of the callback function should not throw the
+   * unexpected exception. Otherwise, {@link NMClientAsync} will just
+   * catch, log and then ignore it.
+   * </p>
+   */
+  public static interface CallbackHandler {
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate its
+     * acceptance of the starting container request
+     * @param containerId the Id of the container
+     * @param allServiceResponse a Map between the auxiliary service names and
+     *                           their outputs
+     */
+    void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds with the status
+     * of the container
+     * @param containerId the Id of the container
+     * @param containerStatus the status of the container
+     */
+    void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate the
+     * container is stopped.
+     * @param containerId the Id of the container
+     */
+    void onContainerStopped(ContainerId containerId);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * starting a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onStartContainerError(ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * querying the status of a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onGetContainerStatusError(ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * stopping a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onStopContainerError(ContainerId containerId, Throwable t);
+
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java?rev=1487184&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java Wed May 29 01:41:41 2013
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * This class implements {@link NMClient}. All the APIs are blocking.
+ * </p>
+ *
+ * <p>
+ * By default, this client stops all the running containers that are started by
+ * it when it stops. It can be disabled via
+ * {@link #cleanupRunningContainersOnStop}, in which case containers will
+ * continue to run even after this client is stopped and till the application
+ * runs at which point ResourceManager will forcefully kill them.
+ * </p>
+ */
+public class NMClientImpl extends AbstractService implements NMClient {
+
+  private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
+
+  // The logically coherent operations on startedContainers is synchronized to
+  // ensure they are atomic
+  protected ConcurrentMap<ContainerId, StartedContainer> startedContainers =
+      new ConcurrentHashMap<ContainerId, StartedContainer>();
+
+  //enabled by default
+  protected AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+
+  public NMClientImpl() {
+    super(NMClientImpl.class.getName());
+  }
+
+  public NMClientImpl(String name) {
+    super(name);
+  }
+
+  @Override
+  public void stop() {
+    // Usually, started-containers are stopped when this client stops. Unless
+    // the flag cleanupRunningContainers is set to false.
+    if (cleanupRunningContainers.get()) {
+      cleanupRunningContainers();
+    }
+    super.stop();
+  }
+
+  protected synchronized void cleanupRunningContainers() {
+    for (StartedContainer startedContainer : startedContainers.values()) {
+      try {
+        stopContainer(startedContainer.getContainerId(),
+            startedContainer.getNodeId(),
+            startedContainer.getContainerToken());
+      } catch (YarnRemoteException e) {
+        LOG.error("Failed to stop Container " +
+            startedContainer.getContainerId() +
+            "when stopping NMClientImpl");
+      } catch (IOException e) {
+        LOG.error("Failed to stop Container " +
+            startedContainer.getContainerId() +
+            "when stopping NMClientImpl");
+      }
+    }
+  }
+
+  @Override
+  public void cleanupRunningContainersOnStop(boolean enabled) {
+    cleanupRunningContainers.set(enabled);
+  }
+
+  protected static class StartedContainer {
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private ContainerToken containerToken;
+    private boolean stopped;
+
+    public StartedContainer(ContainerId containerId, NodeId nodeId,
+        ContainerToken containerToken) {
+      this.containerId = containerId;
+      this.nodeId = nodeId;
+      this.containerToken = containerToken;
+      stopped = false;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public NodeId getNodeId() {
+      return nodeId;
+    }
+
+    public ContainerToken getContainerToken() {
+      return containerToken;
+    }
+  }
+
+  protected static final class NMCommunicator extends AbstractService {
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private ContainerToken containerToken;
+    private ContainerManager containerManager;
+
+    public NMCommunicator(ContainerId containerId, NodeId nodeId,
+        ContainerToken containerToken) {
+      super(NMCommunicator.class.getName());
+      this.containerId = containerId;
+      this.nodeId = nodeId;
+      this.containerToken = containerToken;
+    }
+
+    @Override
+    public synchronized void start() {
+      final YarnRPC rpc = YarnRPC.create(getConfig());
+
+      final InetSocketAddress containerAddress =
+          NetUtils.createSocketAddr(nodeId.toString());
+
+      // the user in createRemoteUser in this context has to be ContainerId
+      UserGroupInformation currentUser =
+          UserGroupInformation.createRemoteUser(containerId.toString());
+
+      Token<ContainerTokenIdentifier> token =
+          ProtoUtils.convertFromProtoFormat(containerToken, containerAddress);
+      currentUser.addToken(token);
+
+      containerManager = currentUser
+          .doAs(new PrivilegedAction<ContainerManager>() {
+            @Override
+            public ContainerManager run() {
+              return (ContainerManager) rpc.getProxy(ContainerManager.class,
+                  containerAddress, getConfig());
+            }
+          });
+
+      LOG.debug("Connecting to ContainerManager at " + containerAddress);
+    }
+
+    @Override
+    public synchronized void stop() {
+      if (this.containerManager != null) {
+        RPC.stopProxy(this.containerManager);
+
+        if (LOG.isDebugEnabled()) {
+          InetSocketAddress containerAddress =
+              NetUtils.createSocketAddr(nodeId.toString());
+          LOG.debug("Disconnecting from ContainerManager at " +
+              containerAddress);
+        }
+      }
+    }
+
+    public synchronized Map<String, ByteBuffer> startContainer(
+        Container container, ContainerLaunchContext containerLaunchContext)
+            throws YarnRemoteException, IOException {
+      if (!container.getId().equals(containerId)) {
+        throw new IllegalArgumentException(
+            "NMCommunicator's containerId  mismatches the given Container's");
+      }
+      StartContainerResponse startResponse = null;
+      try {
+        StartContainerRequest startRequest =
+            Records.newRecord(StartContainerRequest.class);
+        startRequest.setContainer(container);
+        startRequest.setContainerLaunchContext(containerLaunchContext);
+        startResponse = containerManager.startContainer(startRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Started Container " + containerId);
+        }
+      } catch (YarnRemoteException e) {
+        LOG.warn("Container " + containerId + " failed to start", e);
+        throw e;
+      } catch (IOException e) {
+        LOG.warn("Container " + containerId + " failed to start", e);
+        throw e;
+      }
+      return startResponse.getAllServiceResponse();
+    }
+
+    public synchronized void stopContainer() throws YarnRemoteException,
+        IOException {
+      try {
+        StopContainerRequest stopRequest =
+            Records.newRecord(StopContainerRequest.class);
+        stopRequest.setContainerId(containerId);
+        containerManager.stopContainer(stopRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Stopped Container " + containerId);
+        }
+      } catch (YarnRemoteException e) {
+        LOG.warn("Container " + containerId + " failed to stop", e);
+        throw e;
+      } catch (IOException e) {
+        LOG.warn("Container " + containerId + " failed to stop", e);
+        throw e;
+      }
+    }
+
+    public synchronized ContainerStatus getContainerStatus()
+        throws YarnRemoteException, IOException {
+      GetContainerStatusResponse statusResponse = null;
+      try {
+        GetContainerStatusRequest statusRequest =
+            Records.newRecord(GetContainerStatusRequest.class);
+        statusRequest.setContainerId(containerId);
+        statusResponse = containerManager.getContainerStatus(statusRequest);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got the status of Container " + containerId);
+        }
+      } catch (YarnRemoteException e) {
+        LOG.warn(
+            "Unable to get the status of Container " + containerId, e);
+        throw e;
+      } catch (IOException e) {
+        LOG.warn(
+            "Unable to get the status of Container " + containerId, e);
+        throw e;
+      }
+      return statusResponse.getStatus();
+    }
+  }
+
+  @Override
+  public Map<String, ByteBuffer> startContainer(
+      Container container, ContainerLaunchContext containerLaunchContext)
+          throws YarnRemoteException, IOException {
+    // Do synchronization on StartedContainer to prevent race condition
+    // between startContainer and stopContainer
+    synchronized (addStartedContainer(container)) {
+      Map<String, ByteBuffer> allServiceResponse;
+      NMCommunicator nmCommunicator = null;
+      try {
+        nmCommunicator = new NMCommunicator(container.getId(),
+            container.getNodeId(), container.getContainerToken());
+        nmCommunicator.init(getConfig());
+        nmCommunicator.start();
+        allServiceResponse =
+            nmCommunicator.startContainer(container, containerLaunchContext);
+      } catch (YarnRemoteException e) {
+        // Remove the started container if it failed to start
+        removeStartedContainer(container.getId());
+        throw e;
+      } catch (IOException e) {
+        removeStartedContainer(container.getId());
+        throw e;
+      } catch (Throwable t) {
+        removeStartedContainer(container.getId());
+        throw RPCUtil.getRemoteException(t);
+      } finally {
+        if (nmCommunicator != null) {
+          nmCommunicator.stop();
+        }
+      }
+      return allServiceResponse;
+    }
+
+    // Three choices:
+    // 1. starting and releasing the proxy before and after each interaction
+    // 2. starting the proxy when starting the container and releasing it when
+    // stopping the container
+    // 3. starting the proxy when starting the container and releasing it when
+    // stopping the client
+    // Adopt 1 currently
+  }
+
+  @Override
+  public void stopContainer(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) throws YarnRemoteException, IOException {
+    StartedContainer startedContainer = getStartedContainer(containerId);
+    if (startedContainer == null) {
+      throw RPCUtil.getRemoteException("Container " + containerId +
+          " is either not started yet or already stopped");
+    }
+    // Only allow one request of stopping the container to move forward
+    // When entering the block, check whether the precursor has already stopped
+    // the container
+    synchronized (startedContainer) {
+      if (startedContainer.stopped) {
+        return;
+      }
+      NMCommunicator nmCommunicator = null;
+      try {
+        nmCommunicator =
+            new NMCommunicator(containerId, nodeId, containerToken);
+        nmCommunicator.init(getConfig());
+        nmCommunicator.start();
+        nmCommunicator.stopContainer();
+      } finally {
+        if (nmCommunicator != null) {
+          nmCommunicator.stop();
+        }
+        startedContainer.stopped = true;
+        removeStartedContainer(containerId);
+      }
+    }
+  }
+
+  @Override
+  public ContainerStatus getContainerStatus(ContainerId containerId,
+      NodeId nodeId, ContainerToken containerToken)
+          throws YarnRemoteException, IOException {
+    NMCommunicator nmCommunicator = null;
+    try {
+      nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
+      nmCommunicator.init(getConfig());
+      nmCommunicator.start();
+      ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+      return containerStatus;
+    } finally {
+      if (nmCommunicator != null) {
+        nmCommunicator.stop();
+      }
+    }
+  }
+
+  protected synchronized StartedContainer addStartedContainer(
+      Container container) throws YarnRemoteException, IOException {
+    if (startedContainers.containsKey(container.getId())) {
+      throw RPCUtil.getRemoteException("Container " + container.getId() +
+          " is already started");
+    }
+    StartedContainer startedContainer = new StartedContainer(container.getId(),
+        container.getNodeId(), container.getContainerToken());
+    startedContainers.put(startedContainer.getContainerId(), startedContainer);
+    return startedContainer;
+  }
+
+  protected synchronized void removeStartedContainer(ContainerId containerId) {
+    startedContainers.remove(containerId);
+  }
+
+  protected synchronized StartedContainer getStartedContainer(
+      ContainerId containerId) {
+    return startedContainers.get(containerId);
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java?rev=1487184&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java Wed May 29 01:41:41 2013
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNMClient {
+  Configuration conf = null;
+  MiniYARNCluster yarnCluster = null;
+  YarnClientImpl yarnClient = null;
+  AMRMClientImpl rmClient = null;
+  NMClientImpl nmClient = null;
+  List<NodeReport> nodeReports = null;
+  ApplicationAttemptId attemptId = null;
+  int nodeCount = 3;
+
+  @Before
+  public void setup() throws YarnRemoteException, IOException {
+    // start minicluster
+    conf = new YarnConfiguration();
+    yarnCluster =
+        new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+    assertNotNull(yarnCluster);
+    assertEquals(STATE.STARTED, yarnCluster.getServiceState());
+
+    // start rm client
+    yarnClient = new YarnClientImpl();
+    yarnClient.init(conf);
+    yarnClient.start();
+    assertNotNull(yarnClient);
+    assertEquals(STATE.STARTED, yarnClient.getServiceState());
+
+    // get node info
+    nodeReports = yarnClient.getNodeReports();
+
+    // submit new app
+    GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    ApplicationSubmissionContext appContext = Records
+        .newRecord(ApplicationSubmissionContext.class);
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Test");
+    // Set the priority for the application master
+    Priority pri = Priority.newInstance(0);
+    appContext.setPriority(pri);
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records
+        .newRecord(ContainerLaunchContext.class);
+    appContext.setAMContainerSpec(amContainer);
+    // unmanaged AM
+    appContext.setUnmanagedAM(true);
+    // Create the request to send to the applications manager
+    SubmitApplicationRequest appRequest = Records
+        .newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+    // Submit the application to the applications manager
+    yarnClient.submitApplication(appContext);
+
+    // wait for app to start
+    int iterationsLeft = 30;
+    while (iterationsLeft > 0) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState() ==
+          YarnApplicationState.ACCEPTED) {
+        attemptId = appReport.getCurrentApplicationAttemptId();
+        break;
+      }
+      sleep(1000);
+      --iterationsLeft;
+    }
+    if (iterationsLeft == 0) {
+      fail("Application hasn't bee started");
+    }
+
+    // start am rm client
+    rmClient = new AMRMClientImpl(attemptId);
+    rmClient.init(conf);
+    rmClient.start();
+    assertNotNull(rmClient);
+    assertEquals(STATE.STARTED, rmClient.getServiceState());
+
+    // start am nm client
+    nmClient = new NMClientImpl();
+    nmClient.init(conf);
+    nmClient.start();
+    assertNotNull(nmClient);
+    assertEquals(STATE.STARTED, nmClient.getServiceState());
+  }
+
+  @After
+  public void tearDown() {
+    rmClient.stop();
+
+    // leave one unclosed
+    assertEquals(1, nmClient.startedContainers.size());
+    // default true
+    assertTrue(nmClient.cleanupRunningContainers.get());
+    // don't stop the running containers
+    nmClient.cleanupRunningContainersOnStop(false);
+    assertFalse(nmClient.cleanupRunningContainers.get());
+    nmClient.stop();
+    assertTrue(nmClient.startedContainers.size() > 0);
+    // stop the running containers
+    nmClient.cleanupRunningContainersOnStop(true);
+    assertTrue(nmClient.cleanupRunningContainers.get());
+    nmClient.stop();
+    assertEquals(0, nmClient.startedContainers.size());
+
+    yarnClient.stop();
+    yarnCluster.stop();
+  }
+
+  @Test (timeout = 60000)
+  public void testNMClient()
+      throws YarnRemoteException, IOException {
+
+    rmClient.registerApplicationMaster("Host", 10000, "");
+
+    testContainerManagement(nmClient, allocateContainers(rmClient, 5));
+
+    rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+        null, null);
+  }
+
+  private Set<Container> allocateContainers(AMRMClientImpl rmClient, int num)
+      throws YarnRemoteException, IOException {
+    // setup container request
+    Resource capability = Resource.newInstance(1024, 0);
+    Priority priority = Priority.newInstance(0);
+    String node = nodeReports.get(0).getNodeId().getHost();
+    String rack = nodeReports.get(0).getRackName();
+    String[] nodes = new String[] {node};
+    String[] racks = new String[] {rack};
+
+    for (int i = 0; i < num; ++i) {
+      rmClient.addContainerRequest(new ContainerRequest(capability, nodes,
+          racks, priority, 1));
+    }
+
+    int containersRequestedAny = rmClient.remoteRequestsTable.get(priority)
+        .get(ResourceRequest.ANY).get(capability).getNumContainers();
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    int iterationsLeft = 2;
+    Set<Container> containers = new TreeSet<Container>();
+    while (allocatedContainerCount < containersRequestedAny
+        && iterationsLeft > 0) {
+      AllocateResponse allocResponse = rmClient.allocate(0.1f);
+
+      allocatedContainerCount += allocResponse.getAllocatedContainers().size();
+      for(Container container : allocResponse.getAllocatedContainers()) {
+        containers.add(container);
+      }
+      if(allocatedContainerCount < containersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(1000);
+      }
+
+      --iterationsLeft;
+    }
+    return containers;
+  }
+
+  private void testContainerManagement(NMClientImpl nmClient,
+      Set<Container> containers) throws IOException {
+    int size = containers.size();
+    int i = 0;
+    for (Container container : containers) {
+      // getContainerStatus shouldn't be called before startContainer,
+      // otherwise, NodeManager cannot find the container
+      try {
+        nmClient.getContainerStatus(container.getId(), container.getNodeId(),
+            container.getContainerToken());
+        fail("Exception is expected");
+      } catch (YarnRemoteException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains("is not handled by this NodeManager"));
+      }
+
+      // stopContainer shouldn't be called before startContainer,
+      // otherwise, an exception will be thrown
+      try {
+        nmClient.stopContainer(container.getId(), container.getNodeId(),
+            container.getContainerToken());
+        fail("Exception is expected");
+      } catch (YarnRemoteException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains(
+                "is either not started yet or already stopped"));
+      }
+
+      Credentials ts = new Credentials();
+      DataOutputBuffer dob = new DataOutputBuffer();
+      ts.writeTokenStorageToStream(dob);
+      ByteBuffer securityTokens =
+          ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      ContainerLaunchContext clc =
+          Records.newRecord(ContainerLaunchContext.class);
+      clc.setTokens(securityTokens);
+      try {
+        nmClient.startContainer(container, clc);
+      } catch (YarnRemoteException e) {
+        fail("Exception is not expected");
+      }
+
+      // leave one container unclosed
+      if (++i < size) {
+        try {
+          ContainerStatus status = nmClient.getContainerStatus(container.getId(),
+              container.getNodeId(), container.getContainerToken());
+          // verify the container is started and in good shape
+          assertEquals(container.getId(), status.getContainerId());
+          assertEquals(ContainerState.RUNNING, status.getState());
+          assertEquals("", status.getDiagnostics());
+          assertEquals(-1000, status.getExitStatus());
+        } catch (YarnRemoteException e) {
+          fail("Exception is not expected");
+        }
+
+        try {
+          nmClient.stopContainer(container.getId(), container.getNodeId(),
+              container.getContainerToken());
+        } catch (YarnRemoteException e) {
+          fail("Exception is not expected");
+        }
+
+        // getContainerStatus can be called after stopContainer
+        try {
+          ContainerStatus status = nmClient.getContainerStatus(
+              container.getId(), container.getNodeId(),
+              container.getContainerToken());
+          assertEquals(container.getId(), status.getContainerId());
+          assertEquals(ContainerState.RUNNING, status.getState());
+          assertTrue("" + i, status.getDiagnostics().contains(
+              "Container killed by the ApplicationMaster."));
+          assertEquals(-1000, status.getExitStatus());
+        } catch (YarnRemoteException e) {
+          fail("Exception is not expected");
+        }
+      }
+    }
+  }
+
+  private void sleep(int sleepTime) {
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java?rev=1487184&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java Wed May 29 01:41:41 2013
@@ -0,0 +1,540 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+
+
+public class TestNMClientAsync {
+
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  private NMClientAsync asyncClient;
+  private NodeId nodeId;
+  private ContainerToken containerToken;
+
+  @Test (timeout = 30000)
+  public void testNMClientAsync() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
+
+    // Threads to run are more than the max size of the thread pool
+    int expectedSuccess = 40;
+    int expectedFailure = 40;
+
+    asyncClient = new MockNMClientAsync1(expectedSuccess, expectedFailure);
+    asyncClient.init(conf);
+    Assert.assertEquals("The max thread pool size is not correctly set",
+        10, asyncClient.maxThreadPoolSize);
+    asyncClient.start();
+
+
+    for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
+      if (i == expectedSuccess) {
+        while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+            .isAllSuccessCallsExecuted()) {
+          Thread.sleep(10);
+        }
+        asyncClient.client = mockNMClient(1);
+      }
+      Container container = mockContainer(i);
+      ContainerLaunchContext clc =
+          recordFactory.newRecordInstance(ContainerLaunchContext.class);
+      asyncClient.startContainer(container, clc);
+    }
+    while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+        .isStartAndQueryFailureCallsExecuted()) {
+      Thread.sleep(10);
+    }
+    asyncClient.client = mockNMClient(2);
+    ((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
+    for (int i = 0; i < expectedFailure; ++i) {
+      Container container = mockContainer(
+          expectedSuccess + expectedFailure + i);
+      ContainerLaunchContext clc =
+          recordFactory.newRecordInstance(ContainerLaunchContext.class);
+      asyncClient.startContainer(container, clc);
+    }
+    while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+        .isStopFailureCallsExecuted()) {
+      Thread.sleep(10);
+    }
+    for (String errorMsg :
+        ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
+      System.out.println(errorMsg);
+    }
+    Assert.assertEquals("Error occurs in CallbackHandler", 0,
+        ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
+    for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
+      System.out.println(errorMsg);
+    }
+    Assert.assertEquals("Error occurs in ContainerEventProcessor", 0,
+        ((MockNMClientAsync1) asyncClient).errorMsgs.size());
+    asyncClient.stop();
+    Assert.assertFalse(
+        "The thread of Container Management Event Dispatcher is still alive",
+        asyncClient.eventDispatcherThread.isAlive());
+    Assert.assertTrue("The thread pool is not shut down",
+        asyncClient.threadPool.isShutdown());
+  }
+
+  private class MockNMClientAsync1 extends NMClientAsync {
+    private Set<String> errorMsgs =
+        Collections.synchronizedSet(new HashSet<String>());
+
+    protected MockNMClientAsync1(int expectedSuccess, int expectedFailure)
+        throws YarnRemoteException, IOException {
+      super(MockNMClientAsync1.class.getName(), mockNMClient(0),
+          new TestCallbackHandler1(expectedSuccess, expectedFailure));
+    }
+
+    private class MockContainerEventProcessor extends ContainerEventProcessor {
+      public MockContainerEventProcessor(ContainerEvent event) {
+        super(event);
+      }
+
+      @Override
+      public void run() {
+        try {
+          super.run();
+        } catch (RuntimeException e) {
+          // If the unexpected throwable comes from error callback functions, it
+          // will break ContainerEventProcessor.run(). Therefore, monitor
+          // the exception here
+          errorMsgs.add("Unexpected throwable from callback functions should" +
+              " be ignored by Container " + event.getContainerId());
+        }
+      }
+    }
+
+    @Override
+    protected ContainerEventProcessor getContainerEventProcessor(
+        ContainerEvent event) {
+      return new MockContainerEventProcessor(event);
+    }
+  }
+
+  private class TestCallbackHandler1
+      implements NMClientAsync.CallbackHandler {
+
+    private boolean path = true;
+
+    private int expectedSuccess;
+    private int expectedFailure;
+
+    private AtomicInteger actualStartSuccess = new AtomicInteger(0);
+    private AtomicInteger actualStartFailure = new AtomicInteger(0);
+    private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
+    private AtomicInteger actualQueryFailure = new AtomicInteger(0);
+    private AtomicInteger actualStopSuccess = new AtomicInteger(0);
+    private AtomicInteger actualStopFailure = new AtomicInteger(0);
+
+    private AtomicIntegerArray actualStartSuccessArray;
+    private AtomicIntegerArray actualStartFailureArray;
+    private AtomicIntegerArray actualQuerySuccessArray;
+    private AtomicIntegerArray actualQueryFailureArray;
+    private AtomicIntegerArray actualStopSuccessArray;
+    private AtomicIntegerArray actualStopFailureArray;
+
+    private Set<String> errorMsgs =
+        Collections.synchronizedSet(new HashSet<String>());
+
+    public TestCallbackHandler1(int expectedSuccess, int expectedFailure) {
+      this.expectedSuccess = expectedSuccess;
+      this.expectedFailure = expectedFailure;
+
+      actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
+      actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
+      actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
+      actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
+      actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
+      actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+      if (path) {
+        if (containerId.getId() >= expectedSuccess) {
+          errorMsgs.add("Container " + containerId +
+              " should throw the exception onContainerStarted");
+          return;
+        }
+        actualStartSuccess.addAndGet(1);
+        actualStartSuccessArray.set(containerId.getId(), 1);
+
+        // move on to the following success tests
+        asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+      } else {
+        // move on to the following failure tests
+        asyncClient.stopContainer(containerId, nodeId, containerToken);
+      }
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerStatusReceived");
+        return;
+      }
+      actualQuerySuccess.addAndGet(1);
+      actualQuerySuccessArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.stopContainer(containerId, nodeId, containerToken);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerStopped");
+        return;
+      }
+      actualStopSuccess.addAndGet(1);
+      actualStopSuccessArray.set(containerId.getId(), 1);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      // If the unexpected throwable comes from success callback functions, it
+      // will be handled by the error callback functions. Therefore, monitor
+      // the exception here
+      if (t instanceof RuntimeException) {
+        errorMsgs.add("Unexpected throwable from callback functions should be" +
+            " ignored by Container " + containerId);
+      }
+      if (containerId.getId() < expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onStartContainerError");
+        return;
+      }
+      actualStartFailure.addAndGet(1);
+      actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
+      // move on to the following failure tests
+      asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+      if (t instanceof RuntimeException) {
+        errorMsgs.add("Unexpected throwable from callback functions should be" +
+            " ignored by Container " + containerId);
+      }
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onStopContainerError");
+        return;
+      }
+
+      actualStopFailure.addAndGet(1);
+      actualStopFailureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @Override
+    public void onGetContainerStatusError(ContainerId containerId,
+        Throwable t) {
+      if (t instanceof RuntimeException) {
+        errorMsgs.add("Unexpected throwable from callback functions should be"
+            + " ignored by Container " + containerId);
+      }
+      if (containerId.getId() < expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onGetContainerStatusError");
+        return;
+      }
+      actualQueryFailure.addAndGet(1);
+      actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
+
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    public boolean isAllSuccessCallsExecuted() {
+      boolean isAllSuccessCallsExecuted =
+          actualStartSuccess.get() == expectedSuccess &&
+          actualQuerySuccess.get() == expectedSuccess &&
+          actualStopSuccess.get() == expectedSuccess;
+      if (isAllSuccessCallsExecuted) {
+        assertAtomicIntegerArray(actualStartSuccessArray);
+        assertAtomicIntegerArray(actualQuerySuccessArray);
+        assertAtomicIntegerArray(actualStopSuccessArray);
+      }
+      return isAllSuccessCallsExecuted;
+    }
+
+    public boolean isStartAndQueryFailureCallsExecuted() {
+      boolean isStartAndQueryFailureCallsExecuted =
+          actualStartFailure.get() == expectedFailure &&
+          actualQueryFailure.get() == expectedFailure;
+      if (isStartAndQueryFailureCallsExecuted) {
+        assertAtomicIntegerArray(actualStartFailureArray);
+        assertAtomicIntegerArray(actualQueryFailureArray);
+      }
+      return isStartAndQueryFailureCallsExecuted;
+    }
+
+    public boolean isStopFailureCallsExecuted() {
+      boolean isStopFailureCallsExecuted =
+          actualStopFailure.get() == expectedFailure;
+      if (isStopFailureCallsExecuted) {
+        assertAtomicIntegerArray(actualStopFailureArray);
+      }
+      return isStopFailureCallsExecuted;
+    }
+
+    private void assertAtomicIntegerArray(AtomicIntegerArray array) {
+      for (int i = 0; i < array.length(); ++i) {
+        Assert.assertEquals(1, array.get(i));
+      }
+    }
+  }
+
+  private NMClient mockNMClient(int mode)
+      throws YarnRemoteException, IOException {
+    NMClient client = mock(NMClient.class);
+    switch (mode) {
+      case 0:
+        when(client.startContainer(any(Container.class),
+            any(ContainerLaunchContext.class))).thenReturn(
+                Collections.<String, ByteBuffer>emptyMap());
+        when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+            any(ContainerToken.class))).thenReturn(
+                recordFactory.newRecordInstance(ContainerStatus.class));
+        doNothing().when(client).stopContainer(any(ContainerId.class),
+            any(NodeId.class), any(ContainerToken.class));
+        break;
+      case 1:
+        doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
+            .startContainer(any(Container.class),
+                any(ContainerLaunchContext.class));
+        doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
+            .getContainerStatus(any(ContainerId.class), any(NodeId.class),
+                any(ContainerToken.class));
+        doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+            .stopContainer(any(ContainerId.class), any(NodeId.class),
+                any(ContainerToken.class));
+        break;
+      case 2:
+        when(client.startContainer(any(Container.class),
+            any(ContainerLaunchContext.class))).thenReturn(
+                Collections.<String, ByteBuffer>emptyMap());
+        when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
+            any(ContainerToken.class))).thenReturn(
+                recordFactory.newRecordInstance(ContainerStatus.class));
+        doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
+            .stopContainer(any(ContainerId.class), any(NodeId.class),
+                any(ContainerToken.class));
+    }
+    return client;
+  }
+
+  @Test (timeout = 10000)
+  public void testOutOfOrder() throws Exception {
+    CyclicBarrier barrierA = new CyclicBarrier(2);
+    CyclicBarrier barrierB = new CyclicBarrier(2);
+    CyclicBarrier barrierC = new CyclicBarrier(2);
+    asyncClient = new MockNMClientAsync2(barrierA, barrierB, barrierC);
+    asyncClient.init(new Configuration());
+    asyncClient.start();
+
+    final Container container = mockContainer(1);
+    final ContainerLaunchContext clc =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    // start container from another thread
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        asyncClient.startContainer(container, clc);
+      }
+    };
+    t.start();
+
+    barrierA.await();
+    asyncClient.stopContainer(container.getId(), container.getNodeId(),
+        container.getContainerToken());
+    barrierC.await();
+
+    Assert.assertFalse("Starting and stopping should be out of order",
+        ((TestCallbackHandler2) asyncClient.callbackHandler)
+            .exceptionOccurred.get());
+  }
+
+  private class MockNMClientAsync2 extends NMClientAsync {
+    private CyclicBarrier barrierA;
+    private CyclicBarrier barrierB;
+
+    protected MockNMClientAsync2(CyclicBarrier barrierA, CyclicBarrier barrierB,
+        CyclicBarrier barrierC) throws YarnRemoteException, IOException {
+      super(MockNMClientAsync2.class.getName(), mockNMClient(0),
+          new TestCallbackHandler2(barrierC));
+      this.barrierA = barrierA;
+      this.barrierB = barrierB;
+    }
+
+    private class MockContainerEventProcessor extends ContainerEventProcessor {
+
+      public MockContainerEventProcessor(ContainerEvent event) {
+        super(event);
+      }
+
+      @Override
+      public void run() {
+        try {
+          if (event.getType() == ContainerEventType.START_CONTAINER) {
+            barrierA.await();
+            barrierB.await();
+          }
+          super.run();
+          if (event.getType() == ContainerEventType.STOP_CONTAINER) {
+            barrierB.await();
+          }
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        } catch (BrokenBarrierException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    @Override
+    protected ContainerEventProcessor getContainerEventProcessor(
+        ContainerEvent event) {
+      return new MockContainerEventProcessor(event);
+    }
+  }
+
+  private class TestCallbackHandler2
+      implements NMClientAsync.CallbackHandler {
+    private CyclicBarrier barrierC;
+    private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
+
+    public TestCallbackHandler2(CyclicBarrier barrierC) {
+      this.barrierC = barrierC;
+    }
+
+    @Override
+    public void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse) {
+    }
+
+    @Override
+    public void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus) {
+    }
+
+    @Override
+    public void onContainerStopped(ContainerId containerId) {
+    }
+
+    @Override
+    public void onStartContainerError(ContainerId containerId, Throwable t) {
+      if (!t.getMessage().equals(NMClientAsync.StatefulContainer
+          .OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
+        exceptionOccurred.set(true);
+        return;
+      }
+      try {
+        barrierC.await();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (BrokenBarrierException e) {
+        e.printStackTrace();
+      }
+    }
+
+    @Override
+    public void onGetContainerStatusError(ContainerId containerId,
+        Throwable t) {
+    }
+
+    @Override
+    public void onStopContainerError(ContainerId containerId, Throwable t) {
+    }
+
+  }
+
+  private Container mockContainer(int i) {
+    ApplicationId appId =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = ContainerId.newInstance(attemptId, i);
+    nodeId = NodeId.newInstance("localhost", 0);
+    // Create an empty record
+    containerToken = recordFactory.newRecordInstance(ContainerToken.class);
+    return BuilderUtils.newContainer(
+        containerId, nodeId, null, null, null, containerToken, 0);
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1487184&r1=1487183&r2=1487184&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed May 29 01:41:41 2013
@@ -661,6 +661,13 @@
     <value>30</value>
   </property>
 
+  <property>
+    <description>Max number of threads in NMClientAsync to process container
+    management events</description>
+    <name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
+    <value>500</value>
+  </property>
+
   <!--Map Reduce configuration-->
   <property>
     <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>