You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/05/16 17:48:51 UTC

hadoop git commit: YARN-6306. NMClient API change for container upgrade. Contributed by Arun Suresh

Repository: hadoop
Updated Branches:
  refs/heads/trunk 89a8edc01 -> 8236130b2


YARN-6306. NMClient API change for container upgrade. Contributed by Arun Suresh


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8236130b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8236130b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8236130b

Branch: refs/heads/trunk
Commit: 8236130b2c61ab0ee9b8ed747ce8cf96af7f17aa
Parents: 89a8edc
Author: Jian He <ji...@apache.org>
Authored: Tue May 16 10:48:46 2017 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue May 16 10:48:46 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/yarn/client/api/NMClient.java |  86 ++++-
 .../yarn/client/api/async/NMClientAsync.java    |  98 ++++-
 .../api/async/impl/NMClientAsyncImpl.java       | 260 +++++++++++++-
 .../yarn/client/api/impl/NMClientImpl.java      |  91 +++++
 .../api/async/impl/TestNMClientAsync.java       | 359 +++++++++++++++----
 .../yarn/client/api/impl/TestNMClient.java      | 141 ++++++++
 .../container/ContainerImpl.java                |   9 +-
 7 files changed, 966 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
index 47270f5..c1447ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
@@ -58,6 +58,10 @@ public abstract class NMClient extends AbstractService {
     return client;
   }
 
+  protected enum UpgradeOp {
+    REINIT, RESTART, COMMIT, ROLLBACK
+  }
+
   private NMTokenCache nmTokenCache = NMTokenCache.getSingleton();
 
   @Private
@@ -79,8 +83,8 @@ public abstract class NMClient extends AbstractService {
    *                               <code>NodeManager</code> to launch the
    *                               container
    * @return a map between the auxiliary service names and their outputs
-   * @throws YarnException
-   * @throws IOException
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
    */
   public abstract Map<String, ByteBuffer> startContainer(Container container,
       ContainerLaunchContext containerLaunchContext)
@@ -95,9 +99,10 @@ public abstract class NMClient extends AbstractService {
    * {@link Container}.
    * </p>
    *
-   * @param container the container with updated token
-   * @throws YarnException
-   * @throws IOException
+   * @param container the container with updated token.
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
    */
   public abstract void increaseContainerResource(Container container)
       throws YarnException, IOException;
@@ -107,9 +112,9 @@ public abstract class NMClient extends AbstractService {
    *
    * @param containerId the Id of the started container
    * @param nodeId the Id of the <code>NodeManager</code>
-   * 
-   * @throws YarnException
-   * @throws IOException
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
    */
   public abstract void stopContainer(ContainerId containerId, NodeId nodeId)
       throws YarnException, IOException;
@@ -120,14 +125,62 @@ public abstract class NMClient extends AbstractService {
    * @param containerId the Id of the started container
    * @param nodeId the Id of the <code>NodeManager</code>
    * 
-   * @return the status of a container
-   * @throws YarnException
-   * @throws IOException
+   * @return the status of a container.
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
    */
   public abstract ContainerStatus getContainerStatus(ContainerId containerId,
       NodeId nodeId) throws YarnException, IOException;
 
   /**
+   * <p>Re-Initialize the Container.</p>
+   *
+   * @param containerId the Id of the container to Re-Initialize.
+   * @param containerLaunchContex the updated ContainerLaunchContext.
+   * @param autoCommit commit re-initialization automatically ?
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
+   */
+  public abstract void reInitializeContainer(ContainerId containerId,
+      ContainerLaunchContext containerLaunchContex, boolean autoCommit)
+      throws YarnException, IOException;
+
+  /**
+   * <p>Restart the specified container.</p>
+   *
+   * @param containerId the Id of the container to restart.
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
+   */
+  public abstract void restartContainer(ContainerId containerId)
+      throws YarnException, IOException;
+
+  /**
+   * <p>Rollback last reInitialization of the specified container.</p>
+   *
+   * @param containerId the Id of the container to restart.
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
+   */
+  public abstract void rollbackLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException;
+
+  /**
+   * <p>Commit last reInitialization of the specified container.</p>
+   *
+   * @param containerId the Id of the container to commit reInitialize.
+   *
+   * @throws YarnException YarnException.
+   * @throws IOException IOException.
+   */
+  public abstract void commitLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException;
+
+  /**
    * <p>Set whether the containers that are started by this client, and are
    * still running should be stopped when the client stops. By default, the
    * feature should be enabled.</p> However, containers will be stopped only  
@@ -165,4 +218,15 @@ public abstract class NMClient extends AbstractService {
     return nmTokenCache;
   }
 
+  /**
+   * Get the NodeId of the node on which container is running. It returns
+   * null if the container if container is not found or if it is not running.
+   *
+   * @param containerId Container Id of the container.
+   * @return NodeId of the container on which it is running.
+   */
+  public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
index 8e90564..c94942a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.client.api.async;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
@@ -181,6 +179,38 @@ public abstract class NMClientAsync extends AbstractService {
 
   public abstract void increaseContainerResourceAsync(Container container);
 
+  /**
+   * <p>Re-Initialize the Container.</p>
+   *
+   * @param containerId the Id of the container to Re-Initialize.
+   * @param containerLaunchContex the updated ContainerLaunchContext.
+   * @param autoCommit commit re-initialization automatically ?
+   */
+  public abstract void reInitializeContainerAsync(ContainerId containerId,
+      ContainerLaunchContext containerLaunchContex, boolean autoCommit);
+
+  /**
+   * <p>Restart the specified container.</p>
+   *
+   * @param containerId the Id of the container to restart.
+   */
+  public abstract void restartContainerAsync(ContainerId containerId);
+
+  /**
+   * <p>Rollback last reInitialization of the specified container.</p>
+   *
+   * @param containerId the Id of the container to restart.
+   */
+  public abstract void rollbackLastReInitializationAsync(
+      ContainerId containerId);
+
+  /**
+   * <p>Commit last reInitialization of the specified container.</p>
+   *
+   * @param containerId the Id of the container to commit reInitialize.
+   */
+  public abstract void commitLastReInitializationAsync(ContainerId containerId);
+
   public abstract void stopContainerAsync(
       ContainerId containerId, NodeId nodeId);
 
@@ -303,6 +333,70 @@ public abstract class NMClientAsync extends AbstractService {
      */
     public abstract void onStopContainerError(
         ContainerId containerId, Throwable t);
+
+    /**
+     * Callback for container re-initialization request.
+     *
+     * @param containerId the Id of the container to be Re-Initialized.
+     */
+    public void onContainerReInitialize(ContainerId containerId) {}
+
+    /**
+     * Callback for container restart.
+     *
+     * @param containerId the Id of the container to restart.
+     */
+    public void onContainerRestart(ContainerId containerId) {}
+
+    /**
+     * Callback for rollback of last re-initialization.
+     *
+     * @param containerId the Id of the container to restart.
+     */
+    public void onRollbackLastReInitialization(ContainerId containerId) {}
+
+    /**
+     * Callback for commit of last re-initialization.
+     *
+     * @param containerId the Id of the container to commit reInitialize.
+     */
+    public void onCommitLastReInitialization(ContainerId containerId) {}
+
+    /**
+     * Error Callback for container re-initialization request.
+     *
+     * @param containerId the Id of the container to be Re-Initialized.
+     * @param t a Throwable.
+     */
+    public void onContainerReInitializeError(ContainerId containerId,
+        Throwable t) {}
+
+    /**
+     * Error Callback for container restart.
+     *
+     * @param containerId the Id of the container to restart.
+     * @param t a Throwable.
+     *
+     */
+    public void onContainerRestartError(ContainerId containerId, Throwable t) {}
+
+    /**
+     * Error Callback for rollback of last re-initialization.
+     *
+     * @param containerId the Id of the container to restart.
+     * @param t a Throwable.
+     */
+    public void onRollbackLastReInitializationError(ContainerId containerId,
+        Throwable t) {}
+
+    /**
+     * Error Callback for commit of last re-initialization.
+     *
+     * @param containerId the Id of the container to commit reInitialize.
+     * @param t a Throwable.
+     */
+    public void onCommitLastReInitializationError(ContainerId containerId,
+        Throwable t) {}
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
index 575ce13..515a8e8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
@@ -282,6 +282,103 @@ public class NMClientAsyncImpl extends NMClientAsync {
     }
   }
 
+  @Override
+  public void reInitializeContainerAsync(ContainerId containerId,
+      ContainerLaunchContext containerLaunchContex, boolean autoCommit){
+    if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+      LOG.error("Callback handler does not implement container re-initialize "
+          + "callback methods");
+      return;
+    }
+    AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+    if (containers.get(containerId) == null) {
+      handler.onContainerReInitializeError(
+          containerId, RPCUtil.getRemoteException(
+              "Container " + containerId + " is not started"));
+    }
+    try {
+      events.put(new ReInitializeContainerEvevnt(containerId,
+          client.getNodeIdOfStartedContainer(containerId),
+          containerLaunchContex, autoCommit));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of re-initializing of "
+          + "Container " + containerId);
+      handler.onContainerReInitializeError(containerId, e);
+    }
+  }
+
+  @Override
+  public void restartContainerAsync(ContainerId containerId){
+    if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+      LOG.error("Callback handler does not implement container restart "
+          + "callback methods");
+      return;
+    }
+    AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+    if (containers.get(containerId) == null) {
+      handler.onContainerRestartError(
+          containerId, RPCUtil.getRemoteException(
+              "Container " + containerId + " is not started"));
+    }
+    try {
+      events.put(new ContainerEvent(containerId,
+          client.getNodeIdOfStartedContainer(containerId),
+          null, ContainerEventType.RESTART_CONTAINER));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of restart of "
+          + "Container " + containerId);
+      handler.onContainerRestartError(containerId, e);
+    }
+  }
+
+  @Override
+  public void rollbackLastReInitializationAsync(ContainerId containerId){
+    if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+      LOG.error("Callback handler does not implement container rollback "
+          + "callback methods");
+      return;
+    }
+    AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+    if (containers.get(containerId) == null) {
+      handler.onRollbackLastReInitializationError(
+          containerId, RPCUtil.getRemoteException(
+              "Container " + containerId + " is not started"));
+    }
+    try {
+      events.put(new ContainerEvent(containerId,
+          client.getNodeIdOfStartedContainer(containerId),
+          null, ContainerEventType.ROLLBACK_LAST_REINIT));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event Rollback re-initialization"
+          + " of Container " + containerId);
+      handler.onRollbackLastReInitializationError(containerId, e);
+    }
+  }
+
+  @Override
+  public void commitLastReInitializationAsync(ContainerId containerId){
+    if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+      LOG.error("Callback handler does not implement container commit last " +
+          "re-initialization callback methods");
+      return;
+    }
+    AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+    if (containers.get(containerId) == null) {
+      handler.onCommitLastReInitializationError(
+          containerId, RPCUtil.getRemoteException(
+              "Container " + containerId + " is not started"));
+    }
+    try {
+      events.put(new ContainerEvent(containerId,
+          client.getNodeIdOfStartedContainer(containerId),
+          null, ContainerEventType.COMMIT_LAST_REINT));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event Commit re-initialization"
+          + " of Container " + containerId);
+      handler.onCommitLastReInitializationError(containerId, e);
+    }
+  }
+
   public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
     if (containers.get(containerId) == null) {
       callbackHandler.onStopContainerError(containerId,
@@ -330,7 +427,11 @@ public class NMClientAsyncImpl extends NMClientAsync {
     START_CONTAINER,
     STOP_CONTAINER,
     QUERY_CONTAINER,
-    INCREASE_CONTAINER_RESOURCE
+    INCREASE_CONTAINER_RESOURCE,
+    REINITIALIZE_CONTAINER,
+    RESTART_CONTAINER,
+    ROLLBACK_LAST_REINIT,
+    COMMIT_LAST_REINT
   }
 
   protected static class ContainerEvent
@@ -381,6 +482,27 @@ public class NMClientAsyncImpl extends NMClientAsync {
     }
   }
 
+  protected static class ReInitializeContainerEvevnt extends ContainerEvent {
+    private ContainerLaunchContext containerLaunchContext;
+    private boolean autoCommit;
+
+    public ReInitializeContainerEvevnt(ContainerId containerId, NodeId nodeId,
+        ContainerLaunchContext containerLaunchContext, boolean autoCommit) {
+      super(containerId, nodeId, null,
+          ContainerEventType.REINITIALIZE_CONTAINER);
+      this.containerLaunchContext = containerLaunchContext;
+      this.autoCommit = autoCommit;
+    }
+
+    public ContainerLaunchContext getContainerLaunchContext() {
+      return containerLaunchContext;
+    }
+
+    public boolean isAutoCommit() {
+      return autoCommit;
+    }
+  }
+
   protected static class IncreaseContainerResourceEvent extends ContainerEvent {
     private Container container;
 
@@ -416,6 +538,25 @@ public class NMClientAsyncImpl extends NMClientAsync {
             .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
                 ContainerEventType.INCREASE_CONTAINER_RESOURCE,
                 new IncreaseContainerResourceTransition())
+
+            // Transitions for Container Upgrade
+            .addTransition(ContainerState.RUNNING,
+                EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+                ContainerEventType.REINITIALIZE_CONTAINER,
+                new ReInitializeContainerTransition())
+            .addTransition(ContainerState.RUNNING,
+                EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+                ContainerEventType.RESTART_CONTAINER,
+                new ReInitializeContainerTransition())
+            .addTransition(ContainerState.RUNNING,
+                EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+                ContainerEventType.ROLLBACK_LAST_REINIT,
+                new ReInitializeContainerTransition())
+            .addTransition(ContainerState.RUNNING,
+                EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED),
+                ContainerEventType.COMMIT_LAST_REINT,
+                new ReInitializeContainerTransition())
+
             .addTransition(ContainerState.RUNNING,
                 EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
                 ContainerEventType.STOP_CONTAINER,
@@ -431,6 +572,10 @@ public class NMClientAsyncImpl extends NMClientAsync {
             .addTransition(ContainerState.FAILED, ContainerState.FAILED,
                 EnumSet.of(ContainerEventType.START_CONTAINER,
                     ContainerEventType.STOP_CONTAINER,
+                    ContainerEventType.REINITIALIZE_CONTAINER,
+                    ContainerEventType.RESTART_CONTAINER,
+                    ContainerEventType.COMMIT_LAST_REINT,
+                    ContainerEventType.ROLLBACK_LAST_REINIT,
                     ContainerEventType.INCREASE_CONTAINER_RESOURCE));
 
     protected static class StartContainerTransition implements
@@ -529,6 +674,119 @@ public class NMClientAsyncImpl extends NMClientAsync {
       }
     }
 
+    protected static class ReInitializeContainerTransition implements
+        MultipleArcTransition<StatefulContainer, ContainerEvent,
+            ContainerState> {
+
+      @Override
+      public ContainerState transition(StatefulContainer container,
+          ContainerEvent containerEvent) {
+        ContainerId containerId = containerEvent.getContainerId();
+        AbstractCallbackHandler handler = (AbstractCallbackHandler) container
+                .nmClientAsync.getCallbackHandler();
+        Throwable handlerError = null;
+        try {
+          switch(containerEvent.getType()) {
+          case REINITIALIZE_CONTAINER:
+            if (!(containerEvent instanceof ReInitializeContainerEvevnt)) {
+              LOG.error("Unexpected Event.. [" +containerEvent.getType() + "]");
+              return ContainerState.FAILED;
+            }
+            ReInitializeContainerEvevnt rEvent =
+                (ReInitializeContainerEvevnt)containerEvent;
+            container.nmClientAsync.getClient().reInitializeContainer(
+                containerId, rEvent.getContainerLaunchContext(),
+                rEvent.isAutoCommit());
+            try {
+              handler.onContainerReInitialize(containerId);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          case RESTART_CONTAINER:
+            container.nmClientAsync.getClient().restartContainer(containerId);
+            try {
+              handler.onContainerRestart(containerId);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          case ROLLBACK_LAST_REINIT:
+            container.nmClientAsync.getClient()
+                .rollbackLastReInitialization(containerId);
+            try {
+              handler.onRollbackLastReInitialization(containerId);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          case COMMIT_LAST_REINT:
+            container.nmClientAsync.getClient()
+                .commitLastReInitialization(containerId);
+            try {
+              handler.onCommitLastReInitialization(containerId);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          default:
+            LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
+                " expected here..");
+            break;
+          }
+          if (handlerError != null) {
+            LOG.info("Unchecked exception is thrown in handler for event ["
+                + containerEvent.getType() + "] for Container "
+                + containerId, handlerError);
+          }
+
+          return ContainerState.RUNNING;
+        } catch (Throwable t) {
+          switch(containerEvent.getType()) {
+          case REINITIALIZE_CONTAINER:
+            try {
+              handler.onContainerReInitializeError(containerId, t);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          case RESTART_CONTAINER:
+            try {
+              handler.onContainerRestartError(containerId, t);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          case ROLLBACK_LAST_REINIT:
+            try {
+              handler.onRollbackLastReInitializationError(containerId, t);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          case COMMIT_LAST_REINT:
+            try {
+              handler.onCommitLastReInitializationError(containerId, t);
+            } catch (Throwable tr) {
+              handlerError = tr;
+            }
+            break;
+          default:
+            LOG.warn("Event of type [" + containerEvent.getType() + "] not" +
+                " expected here..");
+            break;
+          }
+          if (handlerError != null) {
+            LOG.info("Unchecked exception is thrown in handler for event ["
+                + containerEvent.getType() + "] for Container "
+                + containerId, handlerError);
+          }
+        }
+
+        return ContainerState.FAILED;
+      }
+    }
+
     protected static class StopContainerTransition implements
         MultipleArcTransition<StatefulContainer, ContainerEvent,
         ContainerState> {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
index dc92cda..c81d448 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
@@ -33,10 +33,13 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -306,6 +309,84 @@ public class NMClientImpl extends NMClient {
     }
   }
 
+  @Override
+  public void reInitializeContainer(ContainerId containerId,
+      ContainerLaunchContext containerLaunchContex, boolean autoCommit)
+      throws YarnException, IOException {
+    ContainerManagementProtocolProxyData proxy = null;
+    StartedContainer container = startedContainers.get(containerId);
+    if (container != null) {
+      synchronized (container) {
+        proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
+        try {
+          proxy.getContainerManagementProtocol().reInitializeContainer(
+              ReInitializeContainerRequest.newInstance(
+                  containerId, containerLaunchContex, autoCommit));
+        } finally {
+          if (proxy != null) {
+            cmProxy.mayBeCloseProxy(proxy);
+          }
+        }
+      }
+    } else {
+      throw new YarnException("Unknown container [" + containerId + "]");
+    }
+  }
+
+  @Override
+  public void restartContainer(ContainerId containerId)
+      throws YarnException, IOException {
+    restartCommitOrRollbackContainer(containerId, UpgradeOp.RESTART);
+  }
+
+  @Override
+  public void rollbackLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException {
+    restartCommitOrRollbackContainer(containerId, UpgradeOp.ROLLBACK);
+  }
+
+  @Override
+  public void commitLastReInitialization(ContainerId containerId)
+      throws YarnException, IOException {
+    restartCommitOrRollbackContainer(containerId, UpgradeOp.COMMIT);
+  }
+
+
+  private void restartCommitOrRollbackContainer(ContainerId containerId,
+      UpgradeOp upgradeOp) throws YarnException, IOException {
+    ContainerManagementProtocolProxyData proxy = null;
+    StartedContainer container = startedContainers.get(containerId);
+    if (container != null) {
+      synchronized (container) {
+        proxy = cmProxy.getProxy(container.getNodeId().toString(), containerId);
+        ContainerManagementProtocol cmp =
+            proxy.getContainerManagementProtocol();
+        try {
+          switch (upgradeOp) {
+          case RESTART:
+            cmp.restartContainer(containerId);
+            break;
+          case COMMIT:
+            cmp.commitLastReInitialization(containerId);
+            break;
+          case ROLLBACK:
+            cmp.rollbackLastReInitialization(containerId);
+            break;
+          default:
+            // Should not happen..
+            break;
+          }
+        } finally {
+          if (proxy != null) {
+            cmProxy.mayBeCloseProxy(proxy);
+          }
+        }
+      }
+    } else {
+      throw new YarnException("Unknown container [" + containerId + "]");
+    }
+  }
+
   private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
       throws IOException, YarnException {
     ContainerManagementProtocolProxyData proxy = null;
@@ -343,4 +424,14 @@ public class NMClientImpl extends NMClient {
       throw (IOException) t;
     }
   }
+
+  @Override
+  public NodeId getNodeIdOfStartedContainer(ContainerId containerId) {
+    StartedContainer container = startedContainers.get(containerId);
+    if (container != null) {
+      return container.getNodeId();
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
index 48f3431..dda3eec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.client.api.async.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -27,6 +28,8 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -69,6 +73,22 @@ public class TestNMClientAsync {
   private NodeId nodeId;
   private Token containerToken;
 
+  enum OpsToTest {
+    START, QUERY, STOP, INCR, REINIT, RESTART, ROLLBACK, COMMIT
+  }
+
+  final static class TestData {
+    AtomicInteger success = new AtomicInteger(0);
+    AtomicInteger failure = new AtomicInteger(0);
+    final AtomicIntegerArray successArray;
+    final AtomicIntegerArray failureArray;
+
+    private TestData(int expectedSuccess, int expectedFailure) {
+      this.successArray = new AtomicIntegerArray(expectedSuccess);
+      this.failureArray = new AtomicIntegerArray(expectedFailure);
+    }
+  }
+
   @After
   public void teardown() {
     ServiceOperations.stop(asyncClient);
@@ -194,25 +214,7 @@ public class TestNMClientAsync {
     private int expectedSuccess;
     private int expectedFailure;
 
-    private AtomicInteger actualStartSuccess = new AtomicInteger(0);
-    private AtomicInteger actualStartFailure = new AtomicInteger(0);
-    private AtomicInteger actualQuerySuccess = new AtomicInteger(0);
-    private AtomicInteger actualQueryFailure = new AtomicInteger(0);
-    private AtomicInteger actualStopSuccess = new AtomicInteger(0);
-    private AtomicInteger actualStopFailure = new AtomicInteger(0);
-    private AtomicInteger actualIncreaseResourceSuccess =
-        new AtomicInteger(0);
-    private AtomicInteger actualIncreaseResourceFailure =
-        new AtomicInteger(0);
-
-    private AtomicIntegerArray actualStartSuccessArray;
-    private AtomicIntegerArray actualStartFailureArray;
-    private AtomicIntegerArray actualQuerySuccessArray;
-    private AtomicIntegerArray actualQueryFailureArray;
-    private AtomicIntegerArray actualStopSuccessArray;
-    private AtomicIntegerArray actualStopFailureArray;
-    private AtomicIntegerArray actualIncreaseResourceSuccessArray;
-    private AtomicIntegerArray actualIncreaseResourceFailureArray;
+    private final Map<OpsToTest, TestData> testMap = new HashMap<>();
 
     private Set<String> errorMsgs =
         Collections.synchronizedSet(new HashSet<String>());
@@ -221,16 +223,9 @@ public class TestNMClientAsync {
       this.expectedSuccess = expectedSuccess;
       this.expectedFailure = expectedFailure;
 
-      actualStartSuccessArray = new AtomicIntegerArray(expectedSuccess);
-      actualStartFailureArray = new AtomicIntegerArray(expectedFailure);
-      actualQuerySuccessArray = new AtomicIntegerArray(expectedSuccess);
-      actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
-      actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
-      actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
-      actualIncreaseResourceSuccessArray =
-          new AtomicIntegerArray(expectedSuccess);
-      actualIncreaseResourceFailureArray =
-          new AtomicIntegerArray(expectedFailure);
+      for (OpsToTest op : OpsToTest.values()) {
+        testMap.put(op, new TestData(expectedSuccess, expectedFailure));
+      }
     }
 
     @SuppressWarnings("deprecation")
@@ -243,8 +238,9 @@ public class TestNMClientAsync {
               " should throw the exception onContainerStarted");
           return;
         }
-        actualStartSuccess.addAndGet(1);
-        actualStartSuccessArray.set(containerId.getId(), 1);
+        TestData td = testMap.get(OpsToTest.START);
+        td.success.addAndGet(1);
+        td.successArray.set(containerId.getId(), 1);
 
         // move on to the following success tests
         asyncClient.getContainerStatusAsync(containerId, nodeId);
@@ -254,7 +250,28 @@ public class TestNMClientAsync {
         // containerId
         Container container = Container.newInstance(
             containerId, nodeId, null, null, null, containerToken);
-        asyncClient.increaseContainerResourceAsync(container);
+        int t = containerId.getId() % 5;
+        switch (t) {
+        case 0:
+          asyncClient.increaseContainerResourceAsync(container);
+          break;
+        case 1:
+          asyncClient.reInitializeContainerAsync(containerId,
+              recordFactory.newRecordInstance(ContainerLaunchContext.class),
+              true);
+          break;
+        case 2:
+          asyncClient.restartContainerAsync(containerId);
+          break;
+        case 3:
+          asyncClient.rollbackLastReInitializationAsync(containerId);
+          break;
+        case 4:
+          asyncClient.commitLastReInitializationAsync(containerId);
+          break;
+        default:
+          break;
+        }
       }
 
       // Shouldn't crash the test thread
@@ -270,8 +287,9 @@ public class TestNMClientAsync {
             " should throw the exception onContainerStatusReceived");
         return;
       }
-      actualQuerySuccess.addAndGet(1);
-      actualQuerySuccessArray.set(containerId.getId(), 1);
+      TestData td = testMap.get(OpsToTest.QUERY);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
       // move on to the following success tests
       // make sure we pass in the container with the same
       // containerId
@@ -292,8 +310,78 @@ public class TestNMClientAsync {
             " should throw the exception onContainerResourceIncreased");
         return;
       }
-      actualIncreaseResourceSuccess.addAndGet(1);
-      actualIncreaseResourceSuccessArray.set(containerId.getId(), 1);
+      TestData td = testMap.get(OpsToTest.INCR);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.reInitializeContainerAsync(containerId,
+          Records.newRecord(ContainerLaunchContext.class), true);
+      // throw a fake user exception, and shouldn't crash the test
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onContainerReInitialize(ContainerId containerId) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerReInitialize");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.REINIT);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.restartContainerAsync(containerId);
+      // throw a fake user exception, and shouldn't crash the test
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onContainerRestart(ContainerId containerId) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerReInitialize");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.RESTART);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.rollbackLastReInitializationAsync(containerId);
+      // throw a fake user exception, and shouldn't crash the test
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onRollbackLastReInitialization(ContainerId containerId) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerReInitialize");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.ROLLBACK);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.commitLastReInitializationAsync(containerId);
+      // throw a fake user exception, and shouldn't crash the test
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onCommitLastReInitialization(ContainerId containerId) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerReInitialize");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.COMMIT);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
       // move on to the following success tests
       asyncClient.stopContainerAsync(containerId, nodeId);
       // throw a fake user exception, and shouldn't crash the test
@@ -308,8 +396,9 @@ public class TestNMClientAsync {
             " should throw the exception onContainerStopped");
         return;
       }
-      actualStopSuccess.addAndGet(1);
-      actualStopSuccessArray.set(containerId.getId(), 1);
+      TestData td = testMap.get(OpsToTest.STOP);
+      td.success.addAndGet(1);
+      td.successArray.set(containerId.getId(), 1);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
@@ -330,8 +419,9 @@ public class TestNMClientAsync {
             " shouldn't throw the exception onStartContainerError");
         return;
       }
-      actualStartFailure.addAndGet(1);
-      actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
+      TestData td = testMap.get(OpsToTest.START);
+      td.failure.addAndGet(1);
+      td.failureArray.set(containerId.getId() - expectedSuccess, 1);
       // move on to the following failure tests
       asyncClient.getContainerStatusAsync(containerId, nodeId);
 
@@ -348,8 +438,9 @@ public class TestNMClientAsync {
             " shouldn't throw the exception onIncreaseContainerResourceError");
         return;
       }
-      actualIncreaseResourceFailure.addAndGet(1);
-      actualIncreaseResourceFailureArray.set(
+      TestData td = testMap.get(OpsToTest.INCR);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
           containerId.getId() - expectedSuccess - expectedFailure, 1);
       // increase container resource error should NOT change the
       // the container status to FAILED
@@ -361,6 +452,102 @@ public class TestNMClientAsync {
 
     @SuppressWarnings("deprecation")
     @Override
+    public void onContainerReInitializeError(ContainerId containerId,
+        Throwable t) {
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onContainerReInitializeError");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.REINIT);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+      // increment the stop counters here.. since the container will fail
+      td = testMap.get(OpsToTest.STOP);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+      // reInit container changes the container status to FAILED
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onContainerRestartError(ContainerId containerId, Throwable t) {
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onContainerRestartError");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.RESTART);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+      // increment the stop counters here.. since the container will fail
+      td = testMap.get(OpsToTest.STOP);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+      // restart container changes the container status to FAILED
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onRollbackLastReInitializationError(ContainerId containerId,
+        Throwable t) {
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception" +
+            " onRollbackLastReInitializationError");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.ROLLBACK);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+      // increment the stop counters here.. since the container will fail
+      td = testMap.get(OpsToTest.STOP);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+      // rollback container changes the container status to FAILED
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onCommitLastReInitializationError(ContainerId containerId,
+        Throwable t) {
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onCommitLastReInitializationError");
+        return;
+      }
+      TestData td = testMap.get(OpsToTest.COMMIT);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+
+      // increment the stop counters here.. since the container will fail
+      td = testMap.get(OpsToTest.STOP);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+      // commit container changes the container status to FAILED
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
     public void onStopContainerError(ContainerId containerId, Throwable t) {
       if (t instanceof RuntimeException) {
         errorMsgs.add("Unexpected throwable from callback functions should be" +
@@ -371,9 +558,9 @@ public class TestNMClientAsync {
             " shouldn't throw the exception onStopContainerError");
         return;
       }
-
-      actualStopFailure.addAndGet(1);
-      actualStopFailureArray.set(
+      TestData td = testMap.get(OpsToTest.STOP);
+      td.failure.addAndGet(1);
+      td.failureArray.set(
           containerId.getId() - expectedSuccess - expectedFailure, 1);
 
       // Shouldn't crash the test thread
@@ -393,8 +580,9 @@ public class TestNMClientAsync {
             " shouldn't throw the exception onGetContainerStatusError");
         return;
       }
-      actualQueryFailure.addAndGet(1);
-      actualQueryFailureArray.set(containerId.getId() - expectedSuccess, 1);
+      TestData td = testMap.get(OpsToTest.QUERY);
+      td.failure.addAndGet(1);
+      td.failureArray.set(containerId.getId() - expectedSuccess, 1);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
@@ -402,44 +590,67 @@ public class TestNMClientAsync {
 
     public boolean isAllSuccessCallsExecuted() {
       boolean isAllSuccessCallsExecuted =
-          actualStartSuccess.get() == expectedSuccess &&
-          actualQuerySuccess.get() == expectedSuccess &&
-          actualIncreaseResourceSuccess.get() == expectedSuccess &&
-          actualStopSuccess.get() == expectedSuccess;
+          testMap.get(OpsToTest.START).success.get() == expectedSuccess &&
+              testMap.get(OpsToTest.QUERY).success.get() == expectedSuccess &&
+              testMap.get(OpsToTest.INCR).success.get() == expectedSuccess &&
+              testMap.get(OpsToTest.REINIT).success.get() == expectedSuccess &&
+              testMap.get(OpsToTest.RESTART).success.get() == expectedSuccess &&
+              testMap.get(OpsToTest.ROLLBACK).success.get() ==
+                  expectedSuccess &&
+              testMap.get(OpsToTest.COMMIT).success.get() == expectedSuccess &&
+              testMap.get(OpsToTest.STOP).success.get() == expectedSuccess;
       if (isAllSuccessCallsExecuted) {
-        assertAtomicIntegerArray(actualStartSuccessArray);
-        assertAtomicIntegerArray(actualQuerySuccessArray);
-        assertAtomicIntegerArray(actualIncreaseResourceSuccessArray);
-        assertAtomicIntegerArray(actualStopSuccessArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.START).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.INCR).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.REINIT).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.RESTART).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.ROLLBACK).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.COMMIT).successArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).successArray);
       }
       return isAllSuccessCallsExecuted;
     }
 
     public boolean isStartAndQueryFailureCallsExecuted() {
       boolean isStartAndQueryFailureCallsExecuted =
-          actualStartFailure.get() == expectedFailure &&
-          actualQueryFailure.get() == expectedFailure;
+          testMap.get(OpsToTest.START).failure.get() == expectedFailure &&
+              testMap.get(OpsToTest.QUERY).failure.get() == expectedFailure;
       if (isStartAndQueryFailureCallsExecuted) {
-        assertAtomicIntegerArray(actualStartFailureArray);
-        assertAtomicIntegerArray(actualQueryFailureArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.START).failureArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.QUERY).failureArray);
       }
       return isStartAndQueryFailureCallsExecuted;
     }
 
     public boolean isIncreaseResourceFailureCallsExecuted() {
       boolean isIncreaseResourceFailureCallsExecuted =
-          actualIncreaseResourceFailure.get() == expectedFailure;
+          testMap.get(OpsToTest.INCR).failure.get()
+              + testMap.get(OpsToTest.REINIT).failure.get()
+              + testMap.get(OpsToTest.RESTART).failure.get()
+              + testMap.get(OpsToTest.ROLLBACK).failure.get()
+              + testMap.get(OpsToTest.COMMIT).failure.get()
+              == expectedFailure;
       if (isIncreaseResourceFailureCallsExecuted) {
-        assertAtomicIntegerArray(actualIncreaseResourceFailureArray);
+        AtomicIntegerArray testArray =
+            new AtomicIntegerArray(
+                testMap.get(OpsToTest.INCR).failureArray.length());
+        for (int i = 0; i < testArray.length(); i++) {
+          for (OpsToTest op : EnumSet.of(OpsToTest.REINIT, OpsToTest.RESTART,
+              OpsToTest.ROLLBACK, OpsToTest.COMMIT, OpsToTest.INCR)) {
+            testArray.addAndGet(i, testMap.get(op).failureArray.get(i));
+          }
+        }
+        assertAtomicIntegerArray(testArray);
       }
       return isIncreaseResourceFailureCallsExecuted;
     }
 
     public boolean isStopFailureCallsExecuted() {
       boolean isStopFailureCallsExecuted =
-          actualStopFailure.get() == expectedFailure;
+          testMap.get(OpsToTest.STOP).failure.get() == expectedFailure;
       if (isStopFailureCallsExecuted) {
-        assertAtomicIntegerArray(actualStopFailureArray);
+        assertAtomicIntegerArray(testMap.get(OpsToTest.STOP).failureArray);
       }
       return isStopFailureCallsExecuted;
     }
@@ -464,6 +675,14 @@ public class TestNMClientAsync {
                 recordFactory.newRecordInstance(ContainerStatus.class));
         doNothing().when(client).increaseContainerResource(
             any(Container.class));
+        doNothing().when(client).reInitializeContainer(
+            any(ContainerId.class), any(ContainerLaunchContext.class),
+            anyBoolean());
+        doNothing().when(client).restartContainer(any(ContainerId.class));
+        doNothing().when(client).rollbackLastReInitialization(
+            any(ContainerId.class));
+        doNothing().when(client).commitLastReInitialization(
+            any(ContainerId.class));
         doNothing().when(client).stopContainer(any(ContainerId.class),
             any(NodeId.class));
         break;
@@ -485,9 +704,23 @@ public class TestNMClientAsync {
                 recordFactory.newRecordInstance(ContainerStatus.class));
         doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
             .when(client).increaseContainerResource(any(Container.class));
+        doThrow(RPCUtil.getRemoteException("ReInitialize Exception"))
+            .when(client).reInitializeContainer(
+            any(ContainerId.class), any(ContainerLaunchContext.class),
+            anyBoolean());
+        doThrow(RPCUtil.getRemoteException("Restart Exception"))
+            .when(client).restartContainer(any(ContainerId.class));
+        doThrow(RPCUtil.getRemoteException("Rollback upgrade Exception"))
+            .when(client).rollbackLastReInitialization(
+            any(ContainerId.class));
+        doThrow(RPCUtil.getRemoteException("Commit upgrade Exception"))
+            .when(client).commitLastReInitialization(
+            any(ContainerId.class));
         doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
             .stopContainer(any(ContainerId.class), any(NodeId.class));
     }
+    when(client.getNodeIdOfStartedContainer(any(ContainerId.class)))
+        .thenReturn(NodeId.newInstance("localhost", 0));
     return client;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
index d211d6d..1034f7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -69,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -310,6 +312,36 @@ public class TestNMClient {
             e.getMessage().contains("is not handled by this NodeManager"));
       }
 
+      // restart shouldn't be called before startContainer,
+      // otherwise, NodeManager cannot find the container
+      try {
+        nmClient.restartContainer(container.getId());
+        fail("Exception is expected");
+      } catch (YarnException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains("Unknown container"));
+      }
+
+      // rollback shouldn't be called before startContainer,
+      // otherwise, NodeManager cannot find the container
+      try {
+        nmClient.rollbackLastReInitialization(container.getId());
+        fail("Exception is expected");
+      } catch (YarnException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains("Unknown container"));
+      }
+
+      // commit shouldn't be called before startContainer,
+      // otherwise, NodeManager cannot find the container
+      try {
+        nmClient.commitLastReInitialization(container.getId());
+        fail("Exception is expected");
+      } catch (YarnException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains("Unknown container"));
+      }
+
       // stopContainer shouldn't be called before startContainer,
       // otherwise, an exception will be thrown
       try {
@@ -353,6 +385,28 @@ public class TestNMClient {
         // Test increase container API and make sure requests can reach NM
         testIncreaseContainerResource(container);
 
+        testRestartContainer(container.getId());
+        testGetContainerStatus(container, i, ContainerState.RUNNING,
+            "will be Restarted", Arrays.asList(new Integer[] {-1000}));
+
+        if (i % 2 == 0) {
+          testReInitializeContainer(container.getId(), clc, false);
+          testGetContainerStatus(container, i, ContainerState.RUNNING,
+              "will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
+          testRollbackContainer(container.getId(), false);
+          testGetContainerStatus(container, i, ContainerState.RUNNING,
+              "will be Rolled-back", Arrays.asList(new Integer[] {-1000}));
+          testCommitContainer(container.getId(), true);
+          testReInitializeContainer(container.getId(), clc, false);
+          testCommitContainer(container.getId(), false);
+        } else {
+          testReInitializeContainer(container.getId(), clc, true);
+          testGetContainerStatus(container, i, ContainerState.RUNNING,
+              "will be Re-initialized", Arrays.asList(new Integer[] {-1000}));
+          testRollbackContainer(container.getId(), true);
+          testCommitContainer(container.getId(), true);
+        }
+
         try {
           nmClient.stopContainer(container.getId(), container.getNodeId());
         } catch (YarnException e) {
@@ -432,4 +486,91 @@ public class TestNMClient {
       }
     }
   }
+
+  private void testRestartContainer(ContainerId containerId)
+      throws YarnException, IOException {
+    try {
+      sleep(250);
+      nmClient.restartContainer(containerId);
+      sleep(250);
+    } catch (YarnException e) {
+      // NM container will only be in SCHEDULED state, so expect the increase
+      // action to fail.
+      if (!e.getMessage().contains(
+          "can only be changed when a container is in RUNNING state")) {
+        throw (AssertionError)
+            (new AssertionError("Exception is not expected: " + e)
+                .initCause(e));
+      }
+    }
+  }
+
+  private void testRollbackContainer(ContainerId containerId,
+      boolean notRollbackable) throws YarnException, IOException {
+    try {
+      sleep(250);
+      nmClient.rollbackLastReInitialization(containerId);
+      if (notRollbackable) {
+        fail("Should not be able to rollback..");
+      }
+      sleep(250);
+    } catch (YarnException e) {
+      // NM container will only be in SCHEDULED state, so expect the increase
+      // action to fail.
+      if (notRollbackable) {
+        Assert.assertTrue(e.getMessage().contains(
+            "Nothing to rollback to"));
+      } else {
+        if (!e.getMessage().contains(
+            "can only be changed when a container is in RUNNING state")) {
+          throw (AssertionError)
+              (new AssertionError("Exception is not expected: " + e)
+                  .initCause(e));
+        }
+      }
+    }
+  }
+
+  private void testCommitContainer(ContainerId containerId,
+      boolean notCommittable) throws YarnException, IOException {
+    try {
+      nmClient.commitLastReInitialization(containerId);
+      if (notCommittable) {
+        fail("Should not be able to commit..");
+      }
+    } catch (YarnException e) {
+      // NM container will only be in SCHEDULED state, so expect the increase
+      // action to fail.
+      if (notCommittable) {
+        Assert.assertTrue(e.getMessage().contains(
+            "Nothing to Commit"));
+      } else {
+        if (!e.getMessage().contains(
+            "can only be changed when a container is in RUNNING state")) {
+          throw (AssertionError)
+              (new AssertionError("Exception is not expected: " + e)
+                  .initCause(e));
+        }
+      }
+    }
+  }
+
+  private void testReInitializeContainer(ContainerId containerId,
+      ContainerLaunchContext clc, boolean autoCommit)
+      throws YarnException, IOException {
+    try {
+      sleep(250);
+      nmClient.reInitializeContainer(containerId, clc, autoCommit);
+      sleep(250);
+    } catch (YarnException e) {
+      // NM container will only be in SCHEDULED state, so expect the increase
+      // action to fail.
+      if (!e.getMessage().contains(
+          "can only be changed when a container is in RUNNING state")) {
+        throw (AssertionError)
+            (new AssertionError("Exception is not expected: " + e)
+                .initCause(e));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8236130b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 055e12c..46f8fa0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -22,8 +22,10 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -137,6 +139,8 @@ public class ContainerImpl implements Container {
     }
   }
 
+  private final SimpleDateFormat dateFormat =
+      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
   private final Lock readLock;
   private final Lock writeLock;
   private final Dispatcher dispatcher;
@@ -767,7 +771,7 @@ public class ContainerImpl implements Container {
 
   private void addDiagnostics(String... diags) {
     for (String s : diags) {
-      this.diagnostics.append(s);
+      this.diagnostics.append("[" + dateFormat.format(new Date()) + "]" + s);
     }
     if (diagnostics.length() > diagnosticsMaxSize) {
       diagnostics.delete(0, diagnostics.length() - diagnosticsMaxSize);
@@ -991,6 +995,7 @@ public class ContainerImpl implements Container {
         // We also need to make sure that if Rollback is possible, the
         // rollback state should be retained in the
         // oldLaunchContext and oldResourceSet
+        container.addDiagnostics("Container will be Restarted.\n");
         return new ReInitializationContext(
             container.launchContext, container.resourceSet,
             container.canRollback() ?
@@ -998,6 +1003,7 @@ public class ContainerImpl implements Container {
             container.canRollback() ?
                 container.reInitContext.oldResourceSet : null);
       } else {
+        container.addDiagnostics("Container will be Re-initialized.\n");
         return new ReInitializationContext(
             reInitEvent.getReInitLaunchContext(),
             reInitEvent.getResourceSet(),
@@ -1018,6 +1024,7 @@ public class ContainerImpl implements Container {
     @Override
     protected ReInitializationContext createReInitContext(ContainerImpl
         container, ContainerEvent event) {
+      container.addDiagnostics("Container upgrade will be Rolled-back.\n");
       LOG.warn("Container [" + container.getContainerId() + "]" +
           " about to be explicitly Rolledback !!");
       return container.reInitContext.createContextForRollback();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org