You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/28 18:31:56 UTC

[1/4] hadoop git commit: YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)

Repository: hadoop
Updated Branches:
  refs/heads/branch-3.0 2223393ad -> 59453dad8


YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)

(cherry picked from commit 864fbacd4548004b1de8b0812627976acd22aff5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd51a746
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd51a746
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd51a746

Branch: refs/heads/branch-3.0
Commit: dd51a7460134b9555dfa5f87fcbb5e5ede8deae0
Parents: 2223393
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 9 07:51:03 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 28 11:22:58 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/api/records/ContainerState.java |   7 +-
 .../src/main/proto/yarn_protos.proto            |   1 +
 .../server/nodemanager/ContainerExecutor.java   |  22 +++
 .../container/ContainerEventType.java           |   6 +-
 .../container/ContainerImpl.java                | 170 ++++++++++++++++++-
 .../container/ContainerPauseEvent.java          |  40 +++++
 .../container/ContainerResumeEvent.java         |  39 +++++
 .../container/ContainerState.java               |   3 +-
 .../launcher/ContainerLaunch.java               |  90 +++++++++-
 .../launcher/ContainersLauncher.java            |  32 ++++
 .../launcher/ContainersLauncherEventType.java   |   3 +
 .../scheduler/ContainerSchedulerEventType.java  |   1 +
 .../container/TestContainer.java                |  51 ++++++
 13 files changed, 454 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
index 696fe06..45e5bd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerState.java
@@ -33,11 +33,14 @@ public enum ContainerState {
   
   /** Running container */
   RUNNING, 
-  
+
   /** Completed container */
   COMPLETE,
 
   /** Scheduled (awaiting resources) at the NM. */
   @InterfaceStability.Unstable
-  SCHEDULED
+  SCHEDULED,
+
+  /** Paused at the NM. */
+  PAUSED
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 8354fd2..c2b0e5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -83,6 +83,7 @@ enum ContainerStateProto {
   C_RUNNING = 2;
   C_COMPLETE = 3;
   C_SCHEDULED = 4;
+  C_PAUSED = 5;
 }
 
 message ContainerProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 072cca7..da50d7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -700,6 +700,28 @@ public abstract class ContainerExecutor implements Configurable {
   }
 
   /**
+   * Pause the container. The default implementation is to raise a kill event.
+   * Specific executor implementations can override this behavior.
+   * @param container
+   *          the Container
+   */
+  public void pauseContainer(Container container) {
+    LOG.warn(container.getContainerId() + " doesn't support pausing.");
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Resume the container from pause state. The default implementation ignores
+   * this event. Specific implementations can override this behavior.
+   * @param container
+   *          the Container
+   */
+  public void resumeContainer(Container container) {
+    LOG.warn(container.getContainerId() + " doesn't support resume.");
+    throw new UnsupportedOperationException();
+  }
+
+  /**
    * Get the process-identifier for the container.
    *
    * @param containerID the container ID

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index afea0e6..1475435 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -27,6 +27,8 @@ public enum ContainerEventType {
   CONTAINER_DONE,
   REINITIALIZE_CONTAINER,
   ROLLBACK_REINIT,
+  PAUSE_CONTAINER,
+  RESUME_CONTAINER,
 
   // DownloadManager
   CONTAINER_INITED,
@@ -38,5 +40,7 @@ public enum ContainerEventType {
   CONTAINER_LAUNCHED,
   CONTAINER_EXITED_WITH_SUCCESS,
   CONTAINER_EXITED_WITH_FAILURE,
-  CONTAINER_KILLED_ON_REQUEST
+  CONTAINER_KILLED_ON_REQUEST,
+  CONTAINER_PAUSED,
+  CONTAINER_RESUMED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index e028f38..6f90cdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -305,6 +305,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.NEW, ContainerState.DONE,
         ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
+    .addTransition(ContainerState.NEW, ContainerState.DONE,
+            ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
 
     // From LOCALIZING State
     .addTransition(ContainerState.LOCALIZING,
@@ -320,6 +322,8 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER,
         new KillBeforeRunningTransition())
+    .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
+        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
 
     // From LOCALIZATION_FAILED State
     .addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -333,7 +337,8 @@ public class ContainerImpl implements Container {
     // container not launched so kill is a no-op
     .addTransition(ContainerState.LOCALIZATION_FAILED,
         ContainerState.LOCALIZATION_FAILED,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
     // container cleanup triggers a release of all resources
     // regardless of whether they were localized or not
     // LocalizedResource handles release event in all states
@@ -389,6 +394,76 @@ public class ContainerImpl implements Container {
         ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledExternallyTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
+    ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+
+    // From PAUSING State
+    .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
+        ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
+    // In case something goes wrong then container will exit from the
+    // PAUSING state
+    .addTransition(ContainerState.PAUSING,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
+    .addTransition(ContainerState.PAUSING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledExternallyTransition())
+
+    // From PAUSED State
+    .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+        ContainerEventType.PAUSE_CONTAINER)
+    .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
+        ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
+    // In case something goes wrong then container will exit from the
+    // PAUSED state
+    .addTransition(ContainerState.PAUSED,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledExternallyTransition())
+    .addTransition(ContainerState.PAUSED,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition(true))
+
+    // From RESUMING State
+    .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
+        ContainerEventType.CONTAINER_RESUMED)
+    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    // In case something goes wrong then container will exit from the
+    // RESUMING state
+    .addTransition(ContainerState.RESUMING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.RESUMING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        new KilledExternallyTransition())
+    .addTransition(ContainerState.RESUMING,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition(true))
 
     // From REINITIALIZING State
     .addTransition(ContainerState.REINITIALIZING,
@@ -412,6 +487,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
+        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
     .addTransition(ContainerState.REINITIALIZING,
         ContainerState.SCHEDULED,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
@@ -429,6 +506,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
+        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
 
     // From CONTAINER_EXITED_WITH_SUCCESS State
     .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@@ -440,7 +519,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.EXITED_WITH_SUCCESS,
         ContainerState.EXITED_WITH_SUCCESS,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From EXITED_WITH_FAILURE State
     .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -452,7 +532,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.EXITED_WITH_FAILURE,
                    ContainerState.EXITED_WITH_FAILURE,
-                   ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From KILLING State.
     .addTransition(ContainerState.KILLING,
@@ -486,7 +567,8 @@ public class ContainerImpl implements Container {
     // in the container launcher
     .addTransition(ContainerState.KILLING,
         ContainerState.KILLING,
-        ContainerEventType.CONTAINER_LAUNCHED)
+        EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From CONTAINER_CLEANEDUP_AFTER_KILL State.
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -502,11 +584,13 @@ public class ContainerImpl implements Container {
         EnumSet.of(ContainerEventType.KILL_CONTAINER,
             ContainerEventType.RESOURCE_FAILED,
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
-            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+            ContainerEventType.PAUSE_CONTAINER))
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.PAUSE_CONTAINER))
     .addTransition(ContainerState.DONE, ContainerState.DONE,
         ContainerEventType.INIT_CONTAINER)
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -532,6 +616,8 @@ public class ContainerImpl implements Container {
     case LOCALIZING:
     case LOCALIZATION_FAILED:
     case SCHEDULED:
+    case PAUSED:
+    case RESUMING:
       return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
     case RUNNING:
     case RELAUNCHING:
@@ -541,6 +627,7 @@ public class ContainerImpl implements Container {
     case KILLING:
     case CONTAINER_CLEANEDUP_AFTER_KILL:
     case CONTAINER_RESOURCES_CLEANINGUP:
+    case PAUSING:
       return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
     case DONE:
     default:
@@ -1499,6 +1586,26 @@ public class ContainerImpl implements Container {
   }
 
   /**
+   * Transitions upon receiving PAUSE_CONTAINER.
+   * - LOCALIZED -> KILLING.
+   * - REINITIALIZING -> KILLING.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class KillOnPauseTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Kill the process/process-grp
+      container.setIsReInitializing(false);
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.CLEANUP_CONTAINER));
+    }
+  }
+
+  /**
    * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
    * upon receiving CONTAINER_KILLED_ON_REQUEST.
    */
@@ -1688,6 +1795,57 @@ public class ContainerImpl implements Container {
     }
   }
 
+  /**
+   * Transitions upon receiving PAUSE_CONTAINER.
+   * - RUNNING -> PAUSED
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class PauseContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Pause the process/process-grp if it is supported by the container
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.PAUSE_CONTAINER));
+      ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
+      container.addDiagnostics(pauseEvent.getDiagnostic(), "\n");
+    }
+  }
+
+  /**
+   * Transitions upon receiving PAUSED_CONTAINER.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class PausedContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Container was PAUSED so tell the scheduler
+      container.dispatcher.getEventHandler().handle(
+          new ContainerSchedulerEvent(container,
+              ContainerSchedulerEventType.CONTAINER_PAUSED));
+    }
+  }
+
+  /**
+   * Transitions upon receiving RESUME_CONTAINER.
+   * - PAUSED -> RUNNING
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  static class ResumeContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Pause the process/process-grp if it is supported by the container
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.RESUME_CONTAINER));
+      ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
+      container.addDiagnostics(resumeEvent.getDiagnostic(), "\n");
+    }
+  }
+
   @Override
   public void handle(ContainerEvent event) {
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
new file mode 100644
index 0000000..898304e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerPauseEvent.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.PAUSE_CONTAINER.
+ */
+public class ContainerPauseEvent extends ContainerEvent {
+
+  private final String diagnostic;
+
+  public ContainerPauseEvent(ContainerId cId,
+      String diagnostic) {
+    super(cId, ContainerEventType.PAUSE_CONTAINER);
+    this.diagnostic = diagnostic;
+  }
+
+  public String getDiagnostic() {
+    return this.diagnostic;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
new file mode 100644
index 0000000..d7c9e9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerResumeEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * ContainerEvent for ContainerEventType.RESUME_CONTAINER.
+ */
+public class ContainerResumeEvent extends ContainerEvent {
+
+  private final String diagnostic;
+
+  public ContainerResumeEvent(ContainerId cId,
+      String diagnostic) {
+    super(cId, ContainerEventType.RESUME_CONTAINER);
+    this.diagnostic = diagnostic;
+  }
+
+  public String getDiagnostic() {
+    return this.diagnostic;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 91d1356..7c3fea8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 public enum ContainerState {
   NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
   REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
-  CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
+  CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
+  PAUSING, PAUSED, RESUMING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index d0ce787..89dfdd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ContainerLaunch implements Callable<Integer> {
 
@@ -106,8 +108,10 @@ public class ContainerLaunch implements Callable<Integer> {
   private final Configuration conf;
   private final Context context;
   private final ContainerManagerImpl containerManager;
-  
+
   protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
+  protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);
+
   protected AtomicBoolean completed = new AtomicBoolean(false);
 
   private volatile boolean killedBeforeStart = false;
@@ -803,6 +807,90 @@ public class ContainerLaunch implements Callable<Integer> {
   }
 
   /**
+   * Pause the container.
+   * Cancels the launch if the container isn't launched yet. Otherwise asks the
+   * executor to pause the container.
+   * @throws IOException in case of errors.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void pauseContainer() throws IOException {
+    ContainerId containerId = container.getContainerId();
+    String containerIdStr = containerId.toString();
+    LOG.info("Pausing the container " + containerIdStr);
+
+    // The pause event is only handled if the container is in the running state
+    // (the container state machine), so we don't check for
+    // shouldLaunchContainer over here
+
+    if (!shouldPauseContainer.compareAndSet(false, true)) {
+      LOG.info("Container " + containerId + " not paused as "
+          + "resume already called");
+      return;
+    }
+
+    try {
+      // Pause the container
+      exec.pauseContainer(container);
+
+      // PauseContainer is a blocking call. We are here almost means the
+      // container is paused, so send out the event.
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+          containerId,
+          ContainerEventType.CONTAINER_PAUSED));
+    } catch (Exception e) {
+      String message =
+          "Exception when trying to pause container " + containerIdStr
+              + ": " + StringUtils.stringifyException(e);
+      LOG.info(message);
+      container.handle(new ContainerKillEvent(container.getContainerId(),
+          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+          + " an exception in pausing it."));
+    }
+  }
+
+  /**
+   * Resume the container.
+   * Cancels the launch if the container isn't launched yet. Otherwise asks the
+   * executor to pause the container.
+   * @throws IOException in case of error.
+   */
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  public void resumeContainer() throws IOException {
+    ContainerId containerId = container.getContainerId();
+    String containerIdStr = containerId.toString();
+    LOG.info("Resuming the container " + containerIdStr);
+
+    // The resume event is only handled if the container is in a paused state
+    // so we don't check for the launched flag here.
+
+    // paused flag will be set to true if process already paused
+    boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
+    if (!alreadyPaused) {
+      LOG.info("Container " + containerIdStr + " not paused."
+          + " No resume necessary");
+      return;
+    }
+
+    // If the container has already started
+    try {
+        exec.resumeContainer(container);
+        // ResumeContainer is a blocking call. We are here almost means the
+        // container is resumed, so send out the event.
+        dispatcher.getEventHandler().handle(new ContainerEvent(
+            containerId,
+            ContainerEventType.CONTAINER_RESUMED));
+    } catch (Exception e) {
+      String message =
+          "Exception when trying to resume container " + containerIdStr
+              + ": " + StringUtils.stringifyException(e);
+      LOG.info(message);
+      container.handle(new ContainerKillEvent(container.getContainerId(),
+          ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+          + " an exception in pausing it."));
+    }
+  }
+
+  /**
    * Loop through for a time-bounded interval waiting to
    * read the process id from a file generated by a running process.
    * @param pidFilePath File from which to read the process id

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 25909b9..ca69712 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -171,6 +173,36 @@ public class ContainersLauncher extends AbstractService
               + " with command " + signalEvent.getCommand());
         }
         break;
+      case PAUSE_CONTAINER:
+        ContainerLaunch launchedContainer = running.get(containerId);
+        if (launchedContainer == null) {
+          // Container not launched. So nothing needs to be done.
+          return;
+        }
+
+        // Pause the container
+        try {
+          launchedContainer.pauseContainer();
+        } catch (Exception e) {
+          LOG.info("Got exception while pausing container: " +
+            StringUtils.stringifyException(e));
+        }
+        break;
+      case RESUME_CONTAINER:
+        ContainerLaunch launchCont = running.get(containerId);
+        if (launchCont == null) {
+          // Container not launched. So nothing needs to be done.
+          return;
+        }
+
+        // Resume the container.
+        try {
+          launchCont.resumeContainer();
+        } catch (Exception e) {
+          LOG.info("Got exception while resuming container: " +
+            StringUtils.stringifyException(e));
+        }
+        break;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 380a032..1054e06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -25,4 +25,7 @@ public enum ContainersLauncherEventType {
   CLEANUP_CONTAINER, // The process(grp) itself.
   CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
   SIGNAL_CONTAINER,
+  PAUSE_CONTAINER,
+  RESUME_CONTAINER
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
index 917eda0..a9cbf74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
@@ -27,4 +27,5 @@ public enum ContainerSchedulerEventType {
   UPDATE_CONTAINER,
   // Producer: Node HB response - RM has asked to shed the queue
   SHED_QUEUED_CONTAINERS,
+  CONTAINER_PAUSED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd51a746/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
index 34bf134..64e6cf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentMatcher;
+import org.mockito.Mockito;
 
 public class TestContainer {
 
@@ -209,6 +210,42 @@ public class TestContainer {
 
   @Test
   @SuppressWarnings("unchecked") // mocked generic
+  public void testContainerPauseAndResume() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      int running = metrics.getRunningContainers();
+      wc.launchContainer();
+      assertEquals(running + 1, metrics.getRunningContainers());
+      reset(wc.localizerBus);
+      wc.pauseContainer();
+      assertEquals(ContainerState.PAUSED,
+          wc.c.getContainerState());
+      wc.resumeContainer();
+      assertEquals(ContainerState.RUNNING,
+          wc.c.getContainerState());
+      wc.containerKilledOnRequest();
+      assertEquals(ContainerState.EXITED_WITH_FAILURE,
+          wc.c.getContainerState());
+      assertNull(wc.c.getLocalizedResources());
+      verifyCleanupCall(wc);
+      int failed = metrics.getFailedContainers();
+      wc.containerResourcesCleanup();
+      assertEquals(ContainerState.DONE, wc.c.getContainerState());
+      assertEquals(failed + 1, metrics.getFailedContainers());
+      assertEquals(running, metrics.getRunningContainers());
+    }
+    finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked") // mocked generic
   public void testCleanupOnFailure() throws Exception {
     WrappedContainer wc = null;
     try {
@@ -988,6 +1025,8 @@ public class TestContainer {
       NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
       when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
       ContainerExecutor executor = mock(ContainerExecutor.class);
+      Mockito.doNothing().when(executor).pauseContainer(any(Container.class));
+      Mockito.doNothing().when(executor).resumeContainer(any(Container.class));
       launcher =
           new ContainersLauncher(context, dispatcher, executor, null, null);
       // create a mock ExecutorService, which will not really launch
@@ -1196,6 +1235,18 @@ public class TestContainer {
       drainDispatcherEvents();
     }
 
+    public void pauseContainer() {
+      c.handle(new ContainerPauseEvent(cId,
+          "PauseRequest"));
+      drainDispatcherEvents();
+    }
+
+    public void resumeContainer() {
+      c.handle(new ContainerResumeEvent(cId,
+          "ResumeRequest"));
+      drainDispatcherEvents();
+    }
+
     public void containerKilledOnRequest() {
       int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
       String diagnosticMsg = "Container completed with exit code " + exitCode;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/4] hadoop git commit: YARN-6059. Update paused container state in the NM state store. (Hitesh Sharma via asuresh)

Posted by as...@apache.org.
YARN-6059. Update paused container state in the NM state store. (Hitesh Sharma via asuresh)

(cherry picked from commit 66ca0a65408521d5f9b080dd16b353b49fb8eaea)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ed855e2e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ed855e2e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ed855e2e

Branch: refs/heads/branch-3.0
Commit: ed855e2eef52320c1667b4f55df84aa827adde70
Parents: 8eebee5
Author: Arun Suresh <as...@apache.org>
Authored: Tue Sep 12 12:22:00 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 28 11:27:43 2017 -0700

----------------------------------------------------------------------
 .../container/ContainerImpl.java                |  16 ++-
 .../launcher/ContainerLaunch.java               |  28 ++++-
 .../launcher/ContainersLauncher.java            |  10 ++
 .../launcher/ContainersLauncherEventType.java   |   3 +-
 .../launcher/RecoverPausedContainerLaunch.java  | 124 +++++++++++++++++++
 .../launcher/RecoveredContainerLaunch.java      |   3 +-
 .../recovery/NMLeveldbStateStoreService.java    |  42 ++++++-
 .../recovery/NMNullStateStoreService.java       |   9 ++
 .../recovery/NMStateStoreService.java           |  26 +++-
 .../recovery/NMMemoryStateStoreService.java     |  13 ++
 .../TestNMLeveldbStateStoreService.java         |  17 +++
 11 files changed, 273 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 9e7a664..8efff31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -834,10 +834,18 @@ public class ContainerImpl implements Container {
 
   @SuppressWarnings("unchecked") // dispatcher not typed
   private void sendScheduleEvent() {
-    dispatcher.getEventHandler().handle(
-        new ContainerSchedulerEvent(this,
-            ContainerSchedulerEventType.SCHEDULE_CONTAINER)
-    );
+    if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
+      // Recovery is not supported for paused container so we raise the
+      // launch event which will proceed to kill the paused container instead
+      // of raising the schedule event.
+      ContainersLauncherEventType launcherEvent;
+      launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
+      dispatcher.getEventHandler()
+          .handle(new ContainersLauncherEvent(this, launcherEvent));
+    } else {
+      dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
+          ContainerSchedulerEventType.SCHEDULE_CONTAINER));
+    }
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 89dfdd1..e254887 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -837,6 +837,14 @@ public class ContainerLaunch implements Callable<Integer> {
       dispatcher.getEventHandler().handle(new ContainerEvent(
           containerId,
           ContainerEventType.CONTAINER_PAUSED));
+
+      try {
+        this.context.getNMStateStore().storeContainerPaused(
+            container.getContainerId());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + container.getContainerId()
+            + "] state. The Container has been paused.", e);
+      }
     } catch (Exception e) {
       String message =
           "Exception when trying to pause container " + containerIdStr
@@ -873,12 +881,20 @@ public class ContainerLaunch implements Callable<Integer> {
 
     // If the container has already started
     try {
-        exec.resumeContainer(container);
-        // ResumeContainer is a blocking call. We are here almost means the
-        // container is resumed, so send out the event.
-        dispatcher.getEventHandler().handle(new ContainerEvent(
-            containerId,
-            ContainerEventType.CONTAINER_RESUMED));
+      exec.resumeContainer(container);
+      // ResumeContainer is a blocking call. We are here almost means the
+      // container is resumed, so send out the event.
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+          containerId,
+          ContainerEventType.CONTAINER_RESUMED));
+
+      try {
+        this.context.getNMStateStore().removeContainerPaused(
+            container.getContainerId());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + container.getContainerId()
+            + "] state. The Container has been resumed.", e);
+      }
     } catch (Exception e) {
       String message =
           "Exception when trying to resume container " + containerIdStr

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index ca69712..9f6ef74 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -139,6 +139,16 @@ public class ContainersLauncher extends AbstractService
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
+      case RECOVER_PAUSED_CONTAINER:
+        // Recovery for paused containers is not supported, thus here
+        // we locate any paused containers, and terminate them.
+        app = context.getApplications().get(
+            containerId.getApplicationAttemptId().getApplicationId());
+        launch = new RecoverPausedContainerLaunch(context, getConfig(),
+            dispatcher, exec, app, event.getContainer(), dirsHandler,
+            containerManager);
+        containerLauncher.submit(launch);
+        break;
       case CLEANUP_CONTAINER:
       case CLEANUP_CONTAINER_FOR_REINIT:
         ContainerLaunch launcher = running.remove(containerId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
index 1054e06..847ee34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
@@ -26,6 +26,7 @@ public enum ContainersLauncherEventType {
   CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
   SIGNAL_CONTAINER,
   PAUSE_CONTAINER,
-  RESUME_CONTAINER
+  RESUME_CONTAINER,
+  RECOVER_PAUSED_CONTAINER
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
new file mode 100644
index 0000000..14cab9a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoverPausedContainerLaunch.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+
+/**
+ * This is a ContainerLaunch which has been recovered after an NM restart for
+ * pause containers (for rolling upgrades)
+ */
+public class RecoverPausedContainerLaunch extends ContainerLaunch {
+
+  private static final Log LOG = LogFactory.getLog(
+      RecoveredContainerLaunch.class);
+
+  public RecoverPausedContainerLaunch(Context context,
+      Configuration configuration, Dispatcher dispatcher,
+      ContainerExecutor exec, Application app, Container container,
+      LocalDirsHandlerService dirsHandler,
+      ContainerManagerImpl containerManager) {
+    super(context, configuration, dispatcher, exec, app, container, dirsHandler,
+        containerManager);
+  }
+
+  /**
+   * Cleanup the paused container by issuing a kill on it.
+   */
+  @SuppressWarnings("unchecked")
+  @Override
+  public Integer call() {
+    int retCode = ContainerExecutor.ExitCode.LOST.getExitCode();
+    ContainerId containerId = container.getContainerId();
+    String appIdStr =
+        containerId.getApplicationAttemptId().getApplicationId().toString();
+    String containerIdStr = containerId.toString();
+
+    boolean notInterrupted = true;
+    try {
+      File pidFile = locatePidFile(appIdStr, containerIdStr);
+      if (pidFile != null) {
+        String pidPathStr = pidFile.getPath();
+        pidFilePath = new Path(pidPathStr);
+        exec.activateContainer(containerId, pidFilePath);
+        exec.signalContainer(new ContainerSignalContext.Builder()
+            .setContainer(container)
+            .setUser(container.getUser())
+            .setSignal(ContainerExecutor.Signal.KILL)
+            .build());
+      } else {
+        LOG.warn("Unable to locate pid file for container " + containerIdStr);
+      }
+
+    } catch (InterruptedIOException e) {
+      LOG.warn("Interrupted while waiting for exit code from " + containerId);
+      notInterrupted = false;
+    } catch (IOException e) {
+      LOG.error("Unable to kill the paused container " + containerIdStr, e);
+    } finally {
+      if (notInterrupted) {
+        this.completed.set(true);
+        exec.deactivateContainer(containerId);
+        try {
+          getContext().getNMStateStore()
+              .storeContainerCompleted(containerId, retCode);
+        } catch (IOException e) {
+          LOG.error("Unable to set exit code for container " + containerId);
+        }
+      }
+    }
+
+    LOG.warn("Recovered container exited with a non-zero exit code "
+        + retCode);
+    this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
+        containerId,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
+        "Container exited with a non-zero exit code " + retCode));
+
+    return retCode;
+  }
+
+  private File locatePidFile(String appIdStr, String containerIdStr) {
+    String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
+    for (String dir : getContext().getLocalDirsHandler().
+        getLocalDirsForRead()) {
+      File pidFile = new File(dir, pidSubpath);
+      if (pidFile.exists()) {
+        return pidFile;
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
index 2eba0df..17ddd77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/RecoveredContainerLaunch.java
@@ -40,10 +40,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
 
-
 /**
  * This is a ContainerLaunch which has been recovered after an NM restart (for
- * rolling upgrades)
+ * rolling upgrades).
  */
 public class RecoveredContainerLaunch extends ContainerLaunch {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
index a31756e..631af88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java
@@ -116,6 +116,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
   private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
   private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
+  private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
   private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
       "/resourceChanged";
   private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@@ -266,9 +267,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         if (rcs.status == RecoveredContainerStatus.REQUESTED) {
           rcs.status = RecoveredContainerStatus.QUEUED;
         }
+      } else if (suffix.equals(CONTAINER_PAUSED_KEY_SUFFIX)) {
+        if ((rcs.status == RecoveredContainerStatus.LAUNCHED)
+            ||(rcs.status == RecoveredContainerStatus.QUEUED)
+            ||(rcs.status == RecoveredContainerStatus.REQUESTED)) {
+          rcs.status = RecoveredContainerStatus.PAUSED;
+        }
       } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
         if ((rcs.status == RecoveredContainerStatus.REQUESTED)
-            || (rcs.status == RecoveredContainerStatus.QUEUED)) {
+            || (rcs.status == RecoveredContainerStatus.QUEUED)
+            ||(rcs.status == RecoveredContainerStatus.PAUSED)) {
           rcs.status = RecoveredContainerStatus.LAUNCHED;
         }
       } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@@ -354,6 +362,37 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public void storeContainerPaused(ContainerId containerId) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("storeContainerPaused: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_PAUSED_KEY_SUFFIX;
+    try {
+      db.put(bytes(key), EMPTY_VALUE);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeContainerPaused(ContainerId containerId)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("removeContainerPaused: containerId=" + containerId);
+    }
+
+    String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+        + CONTAINER_PAUSED_KEY_SUFFIX;
+    try {
+      db.delete(bytes(key));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -497,6 +536,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
         batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
+        batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
         batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
         List<String> unknownKeysForContainer = containerUnknownKeySuffixes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
index 86dc99f..155ee8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java
@@ -79,6 +79,15 @@ public class NMNullStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public void storeContainerPaused(ContainerId containerId) throws IOException {
+  }
+
+  @Override
+  public void removeContainerPaused(ContainerId containerId)
+      throws IOException {
+  }
+
+  @Override
   public void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
index ec534bf..7e2debf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java
@@ -71,7 +71,8 @@ public abstract class NMStateStoreService extends AbstractService {
     REQUESTED,
     QUEUED,
     LAUNCHED,
-    COMPLETED
+    COMPLETED,
+    PAUSED
   }
 
   public static class RecoveredContainerState {
@@ -338,9 +339,9 @@ public abstract class NMStateStoreService extends AbstractService {
   }
 
   /**
-   * Load the state of applications
-   * @return recovered state for applications
-   * @throws IOException
+   * Load the state of applications.
+   * @return recovered state for applications.
+   * @throws IOException IO Exception.
    */
   public abstract RecoveredApplicationsState loadApplicationsState()
       throws IOException;
@@ -392,6 +393,23 @@ public abstract class NMStateStoreService extends AbstractService {
       throws IOException;
 
   /**
+   * Record that a container has been paused at the NM.
+   * @param containerId the container ID.
+   * @throws IOException IO Exception.
+   */
+  public abstract void storeContainerPaused(ContainerId containerId)
+      throws IOException;
+
+  /**
+   * Record that a container has been resumed at the NM by removing the
+   * fact that it has be paused.
+   * @param containerId the container ID.
+   * @throws IOException IO Exception.
+   */
+  public abstract void removeContainerPaused(ContainerId containerId)
+      throws IOException;
+
+  /**
    * Record that a container has been launched
    * @param containerId the container ID
    * @throws IOException

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
index c1638df..96893a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java
@@ -142,6 +142,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
   }
 
   @Override
+  public void storeContainerPaused(ContainerId containerId) throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.PAUSED;
+  }
+
+  @Override
+  public void removeContainerPaused(ContainerId containerId)
+      throws IOException {
+    RecoveredContainerState rcs = getRecoveredContainerState(containerId);
+    rcs.status = RecoveredContainerStatus.LAUNCHED;
+  }
+
+  @Override
   public synchronized void storeContainerDiagnostics(ContainerId containerId,
       StringBuilder diagnostics) throws IOException {
     RecoveredContainerState rcs = getRecoveredContainerState(containerId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ed855e2e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
index b0a9bc9..76fe130 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java
@@ -288,6 +288,23 @@ public class TestNMLeveldbStateStoreService {
     assertEquals(containerReq, rcs.getStartRequest());
     assertEquals(diags.toString(), rcs.getDiagnostics());
 
+    // pause the container, and verify recovered
+    stateStore.storeContainerPaused(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+    rcs = recoveredContainers.get(0);
+    assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
+    assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
+    assertEquals(false, rcs.getKilled());
+    assertEquals(containerReq, rcs.getStartRequest());
+
+    // Resume the container
+    stateStore.removeContainerPaused(containerId);
+    restartStateStore();
+    recoveredContainers = stateStore.loadContainersState();
+    assertEquals(1, recoveredContainers.size());
+
     // increase the container size, and verify recovered
     stateStore.storeContainerResourceChanged(containerId, 2,
         Resource.newInstance(2468, 4));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/4] hadoop git commit: YARN-7240. Add more states and transitions to stabilize the NM Container state machine. (Kartheek Muthyala via asuresh)

Posted by as...@apache.org.
YARN-7240. Add more states and transitions to stabilize the NM Container state machine. (Kartheek Muthyala via asuresh)

(cherry picked from commit df800f6cf3ea663daf4081ebe784808b08d9366d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/59453dad
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/59453dad
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/59453dad

Branch: refs/heads/branch-3.0
Commit: 59453dad8c39590dc16bef5cdf4fdae6618c9fa2
Parents: ed855e2
Author: Arun Suresh <as...@apache.org>
Authored: Mon Sep 25 14:11:55 2017 -0700
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 28 11:29:26 2017 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  |  41 +---
 .../container/ContainerEventType.java           |   6 +-
 .../container/ContainerImpl.java                | 174 ++++++++++++--
 .../container/ContainerState.java               |   3 +-
 .../container/UpdateContainerTokenEvent.java    |  86 +++++++
 .../scheduler/ContainerScheduler.java           | 114 ++++-----
 .../UpdateContainerSchedulerEvent.java          |  46 ++--
 .../BaseContainerManagerTest.java               |   2 +
 .../containermanager/TestContainerManager.java  | 229 ++++++++++++++++++-
 .../TestContainerSchedulerQueuing.java          | 101 ++++++++
 10 files changed, 660 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e497f62..d12892e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -144,7 +145,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
 
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -1251,29 +1251,6 @@ public class ContainerManagerImpl extends CompositeService implements
           + " [" + containerTokenIdentifier.getVersion() + "]");
     }
 
-    // Check container state
-    org.apache.hadoop.yarn.server.nodemanager.
-        containermanager.container.ContainerState currentState =
-        container.getContainerState();
-    EnumSet<org.apache.hadoop.yarn.server.nodemanager.containermanager
-        .container.ContainerState> allowedStates = EnumSet.of(
-        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
-            .ContainerState.RUNNING,
-        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
-            .ContainerState.SCHEDULED,
-        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
-            .ContainerState.LOCALIZING,
-        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
-            .ContainerState.REINITIALIZING,
-        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
-            .ContainerState.RELAUNCHING);
-    if (!allowedStates.contains(currentState)) {
-      throw RPCUtil.getRemoteException("Container " + containerId.toString()
-          + " is in " + currentState.name() + " state."
-          + " Resource can only be changed when a container is in"
-          + " RUNNING or SCHEDULED state");
-    }
-
     // Check validity of the target resource.
     Resource currentResource = container.getResource();
     ExecutionType currentExecType =
@@ -1313,11 +1290,11 @@ public class ContainerManagerImpl extends CompositeService implements
     this.readLock.lock();
     try {
       if (!serviceStopped) {
-        // Dispatch message to ContainerScheduler to actually
+        // Dispatch message to Container to actually
         // make the change.
-        dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
-            container, containerTokenIdentifier, isResourceChange,
-            isExecTypeUpdate, isIncrease));
+        dispatcher.getEventHandler().handle(new UpdateContainerTokenEvent(
+            container.getContainerId(), containerTokenIdentifier,
+            isResourceChange, isExecTypeUpdate, isIncrease));
       } else {
         throw new YarnException(
             "Unable to change container resource as the NodeManager is "
@@ -1816,10 +1793,14 @@ public class ContainerManagerImpl extends CompositeService implements
     if (container == null) {
       throw new YarnException("Specified " + containerId + " does not exist!");
     }
-    if (!container.isRunning() || container.isReInitializing()) {
+    if (!container.isRunning() || container.isReInitializing()
+        || container.getContainerTokenIdentifier().getExecutionType()
+        == ExecutionType.OPPORTUNISTIC) {
       throw new YarnException("Cannot perform " + op + " on [" + containerId
           + "]. Current state is [" + container.getContainerState() + ", " +
-          "isReInitializing=" + container.isReInitializing() + "].");
+          "isReInitializing=" + container.isReInitializing() + "]. Container"
+          + " Execution Type is [" + container.getContainerTokenIdentifier()
+          .getExecutionType() + "].");
     }
     return container;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
index 1475435..e28b37d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
@@ -29,6 +29,7 @@ public enum ContainerEventType {
   ROLLBACK_REINIT,
   PAUSE_CONTAINER,
   RESUME_CONTAINER,
+  UPDATE_CONTAINER_TOKEN,
 
   // DownloadManager
   CONTAINER_INITED,
@@ -42,5 +43,8 @@ public enum ContainerEventType {
   CONTAINER_EXITED_WITH_FAILURE,
   CONTAINER_KILLED_ON_REQUEST,
   CONTAINER_PAUSED,
-  CONTAINER_RESUMED
+  CONTAINER_RESUMED,
+
+  // Producer: ContainerScheduler
+  CONTAINER_TOKEN_UPDATED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 8efff31..b7a13b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -33,6 +33,8 @@ import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -305,8 +307,8 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.NEW, ContainerState.DONE,
         ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
-    .addTransition(ContainerState.NEW, ContainerState.DONE,
-            ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+    .addTransition(ContainerState.NEW, ContainerState.NEW,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
 
     // From LOCALIZING State
     .addTransition(ContainerState.LOCALIZING,
@@ -322,8 +324,9 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER,
         new KillBeforeRunningTransition())
-    .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
-        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+    .addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
+
 
     // From LOCALIZATION_FAILED State
     .addTransition(ContainerState.LOCALIZATION_FAILED,
@@ -348,6 +351,9 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.LOCALIZATION_FAILED,
         ContainerState.LOCALIZATION_FAILED,
         ContainerEventType.RESOURCE_FAILED)
+    .addTransition(ContainerState.LOCALIZATION_FAILED,
+        ContainerState.LOCALIZATION_FAILED,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
 
     // From SCHEDULED State
     .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
@@ -361,6 +367,9 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER,
         new KillBeforeRunningTransition())
+    .addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
 
     // From RUNNING State
     .addTransition(ContainerState.RUNNING,
@@ -373,10 +382,16 @@ public class ContainerImpl implements Container {
             ContainerState.EXITED_WITH_FAILURE),
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new RetryFailureTransition())
-    .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+    .addTransition(ContainerState.RUNNING,
+        EnumSet.of(ContainerState.RUNNING,
+            ContainerState.REINITIALIZING,
+            ContainerState.REINITIALIZING_AWAITING_KILL),
         ContainerEventType.REINITIALIZE_CONTAINER,
         new ReInitializeContainerTransition())
-    .addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
+    .addTransition(ContainerState.RUNNING,
+        EnumSet.of(ContainerState.RUNNING,
+            ContainerState.REINITIALIZING,
+            ContainerState.REINITIALIZING_AWAITING_KILL),
         ContainerEventType.ROLLBACK_REINIT,
         new RollbackContainerTransition())
     .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
@@ -395,9 +410,16 @@ public class ContainerImpl implements Container {
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledExternallyTransition())
     .addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
-    ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+        ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
+
 
     // From PAUSING State
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new ResourceLocalizedWhileRunningTransition())
     .addTransition(ContainerState.PAUSING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
     .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
@@ -417,6 +439,12 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledExternallyTransition())
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new ResourceLocalizedWhileRunningTransition())
+    .addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
 
     // From PAUSED State
     .addTransition(ContainerState.PAUSED, ContainerState.KILLING,
@@ -426,6 +454,10 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
         ContainerEventType.PAUSE_CONTAINER)
+    // This can happen during re-initialization.
+    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new ResourceLocalizedWhileRunningTransition())
     .addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
         ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
     // In case something goes wrong then container will exit from the
@@ -441,6 +473,9 @@ public class ContainerImpl implements Container {
         ContainerState.EXITED_WITH_SUCCESS,
         ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
         new ExitedWithSuccessTransition(true))
+    .addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
 
     // From RESUMING State
     .addTransition(ContainerState.RESUMING, ContainerState.KILLING,
@@ -450,6 +485,10 @@ public class ContainerImpl implements Container {
     .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
         ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
         UPDATE_DIAGNOSTICS_TRANSITION)
+    // This can happen during re-initialization
+    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        new ResourceLocalizedWhileRunningTransition())
     // In case something goes wrong then container will exit from the
     // RESUMING state
     .addTransition(ContainerState.RESUMING,
@@ -464,6 +503,10 @@ public class ContainerImpl implements Container {
         ContainerState.EXITED_WITH_SUCCESS,
         ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
         new ExitedWithSuccessTransition(true))
+    .addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
+    // NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state.
 
     // From REINITIALIZING State
     .addTransition(ContainerState.REINITIALIZING,
@@ -475,7 +518,8 @@ public class ContainerImpl implements Container {
         ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
         new ExitedWithFailureTransition(true))
     .addTransition(ContainerState.REINITIALIZING,
-        ContainerState.REINITIALIZING,
+        EnumSet.of(ContainerState.REINITIALIZING,
+            ContainerState.REINITIALIZING_AWAITING_KILL),
         ContainerEventType.RESOURCE_LOCALIZED,
         new ResourceLocalizedWhileReInitTransition())
     .addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
@@ -487,12 +531,39 @@ public class ContainerImpl implements Container {
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
-    .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
-        ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+    .addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING,
+        ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
     .addTransition(ContainerState.REINITIALIZING,
+        ContainerState.REINITIALIZING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
+
+    // from REINITIALIZING_AWAITING_KILL
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition(true))
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition(true))
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        UPDATE_DIAGNOSTICS_TRANSITION)
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER)
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
         ContainerState.SCHEDULED,
         ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
         new KilledForReInitializationTransition())
+    .addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerState.REINITIALIZING_AWAITING_KILL,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
 
     // From RELAUNCHING State
     .addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
@@ -508,6 +579,10 @@ public class ContainerImpl implements Container {
         ContainerEventType.KILL_CONTAINER, new KillTransition())
     .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
         ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
+    .addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        new NotifyContainerSchedulerOfUpdateTransition())
+
 
     // From CONTAINER_EXITED_WITH_SUCCESS State
     .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@@ -521,6 +596,10 @@ public class ContainerImpl implements Container {
         ContainerState.EXITED_WITH_SUCCESS,
         EnumSet.of(ContainerEventType.KILL_CONTAINER,
             ContainerEventType.PAUSE_CONTAINER))
+    // No transition - assuming container is on its way to completion
+    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN)
 
     // From EXITED_WITH_FAILURE State
     .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@@ -534,6 +613,10 @@ public class ContainerImpl implements Container {
                    ContainerState.EXITED_WITH_FAILURE,
         EnumSet.of(ContainerEventType.KILL_CONTAINER,
             ContainerEventType.PAUSE_CONTAINER))
+    // No transition - assuming container is on its way to completion
+    .addTransition(ContainerState.EXITED_WITH_FAILURE,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN)
 
     // From KILLING State.
     .addTransition(ContainerState.KILLING,
@@ -569,6 +652,9 @@ public class ContainerImpl implements Container {
         ContainerState.KILLING,
         EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
             ContainerEventType.PAUSE_CONTAINER))
+    // No transition - assuming container is on its way to completion
+    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN)
 
     // From CONTAINER_CLEANEDUP_AFTER_KILL State.
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -586,6 +672,10 @@ public class ContainerImpl implements Container {
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
             ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
             ContainerEventType.PAUSE_CONTAINER))
+    // No transition - assuming container is on its way to completion
+    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN)
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -603,6 +693,9 @@ public class ContainerImpl implements Container {
         EnumSet.of(ContainerEventType.RESOURCE_FAILED,
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
             ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
+    // No transition - assuming container is on its way to completion
+    .addTransition(ContainerState.DONE, ContainerState.DONE,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN)
 
     // create the topology tables
     .installTopology();
@@ -622,6 +715,7 @@ public class ContainerImpl implements Container {
     case RUNNING:
     case RELAUNCHING:
     case REINITIALIZING:
+    case REINITIALIZING_AWAITING_KILL:
     case EXITED_WITH_SUCCESS:
     case EXITED_WITH_FAILURE:
     case KILLING:
@@ -925,6 +1019,45 @@ public class ContainerImpl implements Container {
 
   }
 
+  static class UpdateTransition extends ContainerTransition {
+    @Override
+    public void transition(
+        ContainerImpl container, ContainerEvent event) {
+      UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
+      // Update the container token
+      container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());
+      if (updateEvent.isResourceChange()) {
+        try {
+          // Persist change in the state store.
+          container.context.getNMStateStore().storeContainerResourceChanged(
+              container.containerId,
+              container.getContainerTokenIdentifier().getVersion(),
+              container.getResource());
+        } catch (IOException e) {
+          LOG.warn("Could not store container [" + container.containerId
+              + "] resource change..", e);
+        }
+      }
+    }
+  }
+
+  static class NotifyContainerSchedulerOfUpdateTransition extends
+      UpdateTransition {
+    @Override
+    public void transition(
+        ContainerImpl container, ContainerEvent event) {
+
+      UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
+      // Save original token
+      ContainerTokenIdentifier originalToken =
+          container.containerTokenIdentifier;
+      super.transition(container, updateEvent);
+      container.dispatcher.getEventHandler().handle(
+          new UpdateContainerSchedulerEvent(container,
+              originalToken, updateEvent));
+    }
+  }
+
   /**
    * State transition when a NEW container receives the INIT_CONTAINER
    * message.
@@ -1070,12 +1203,15 @@ public class ContainerImpl implements Container {
   /**
    * Transition to start the Re-Initialization process.
    */
-  static class ReInitializeContainerTransition extends ContainerTransition {
+  static class ReInitializeContainerTransition implements
+      MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(ContainerImpl container, ContainerEvent event) {
+    public ContainerState transition(
+        ContainerImpl container, ContainerEvent event) {
       container.reInitContext = createReInitContext(container, event);
+      boolean resourcesPresent = false;
       try {
         // 'reInitContext.newResourceSet' can be
         // a) current container resourceSet (In case of Restart)
@@ -1097,6 +1233,7 @@ public class ContainerImpl implements Container {
           container.dispatcher.getEventHandler().handle(
               new ContainersLauncherEvent(container,
                   ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+          resourcesPresent = true;
         }
         container.metrics.reInitingContainer();
         NMAuditLogger.logSuccess(container.user,
@@ -1108,7 +1245,11 @@ public class ContainerImpl implements Container {
             " re-initialization failure..", e);
         container.addDiagnostics("Error re-initializing due to" +
             "[" + e.getMessage() + "]");
+        return ContainerState.RUNNING;
       }
+      return resourcesPresent ?
+          ContainerState.REINITIALIZING_AWAITING_KILL :
+          ContainerState.REINITIALIZING;
     }
 
     protected ReInitializationContext createReInitContext(
@@ -1160,11 +1301,14 @@ public class ContainerImpl implements Container {
    * If all dependencies are met, then restart Container with new bits.
    */
   static class ResourceLocalizedWhileReInitTransition
-      extends ContainerTransition {
+      implements MultipleArcTransition
+      <ContainerImpl, ContainerEvent, ContainerState> {
+
 
     @SuppressWarnings("unchecked")
     @Override
-    public void transition(ContainerImpl container, ContainerEvent event) {
+    public ContainerState transition(
+        ContainerImpl container, ContainerEvent event) {
       ContainerResourceLocalizedEvent rsrcEvent =
           (ContainerResourceLocalizedEvent) event;
       container.reInitContext.newResourceSet.resourceLocalized(
@@ -1176,7 +1320,9 @@ public class ContainerImpl implements Container {
         container.dispatcher.getEventHandler().handle(
             new ContainersLauncherEvent(container,
                 ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
+        return ContainerState.REINITIALIZING_AWAITING_KILL;
       }
+      return ContainerState.REINITIALIZING;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
index 7c3fea8..5644d03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
@@ -20,7 +20,8 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
 public enum ContainerState {
   NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
-  REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
+  REINITIALIZING, REINITIALIZING_AWAITING_KILL,
+  EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
   CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
   PAUSING, PAUSED, RESUMING
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java
new file mode 100644
index 0000000..c9dc97e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/UpdateContainerTokenEvent.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+
+/**
+ * Update Event consumed by the Container.
+ */
+public class UpdateContainerTokenEvent extends ContainerEvent {
+  private final ContainerTokenIdentifier updatedToken;
+  private final boolean isResourceChange;
+  private final boolean isExecTypeUpdate;
+  private final boolean isIncrease;
+
+  /**
+   * Create Update event.
+   *
+   * @param cID Container Id.
+   * @param updatedToken Updated Container Token.
+   * @param isResourceChange Is Resource change.
+   * @param isExecTypeUpdate Is ExecutionType Update.
+   * @param isIncrease Is container increase.
+   */
+  public UpdateContainerTokenEvent(ContainerId cID,
+      ContainerTokenIdentifier updatedToken, boolean isResourceChange,
+      boolean isExecTypeUpdate, boolean isIncrease) {
+    super(cID, ContainerEventType.UPDATE_CONTAINER_TOKEN);
+    this.updatedToken = updatedToken;
+    this.isResourceChange = isResourceChange;
+    this.isExecTypeUpdate = isExecTypeUpdate;
+    this.isIncrease = isIncrease;
+  }
+
+  /**
+   * Update Container Token.
+   *
+   * @return Container Token.
+   */
+  public ContainerTokenIdentifier getUpdatedToken() {
+    return updatedToken;
+  }
+
+  /**
+   * Is this update a ResourceChange.
+   *
+   * @return isResourceChange.
+   */
+  public boolean isResourceChange() {
+    return isResourceChange;
+  }
+
+  /**
+   * Is this update an ExecType Update.
+   *
+   * @return isExecTypeUpdate.
+   */
+  public boolean isExecTypeUpdate() {
+    return isExecTypeUpdate;
+  }
+
+  /**
+   * Is this a container Increase.
+   *
+   * @return isIncrease.
+   */
+  public boolean isIncrease() {
+    return isIncrease;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 830a06d..e436822 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
     .ChangeMonitoringContainerResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -151,7 +152,9 @@ public class ContainerScheduler extends AbstractService implements
     case SCHEDULE_CONTAINER:
       scheduleContainer(event.getContainer());
       break;
+    // NOTE: Is sent only after container state has changed to PAUSED...
     case CONTAINER_PAUSED:
+    // NOTE: Is sent only after container state has changed to DONE...
     case CONTAINER_COMPLETED:
       onResourcesReclaimed(event.getContainer());
       break;
@@ -180,58 +183,38 @@ public class ContainerScheduler extends AbstractService implements
     if (updateEvent.isResourceChange()) {
       if (runningContainers.containsKey(containerId)) {
         this.utilizationTracker.subtractContainerResource(
-            updateEvent.getContainer());
-        updateEvent.getContainer().setContainerTokenIdentifier(
-            updateEvent.getUpdatedToken());
+            new ContainerImpl(getConfig(), null, null, null, null,
+                updateEvent.getOriginalToken(), context));
         this.utilizationTracker.addContainerResources(
             updateEvent.getContainer());
         getContainersMonitor().handle(
             new ChangeMonitoringContainerResourceEvent(containerId,
                 updateEvent.getUpdatedToken().getResource()));
-      } else {
-        // Is Queued or localizing..
-        updateEvent.getContainer().setContainerTokenIdentifier(
-            updateEvent.getUpdatedToken());
-      }
-      try {
-        // Persist change in the state store.
-        this.context.getNMStateStore().storeContainerResourceChanged(
-            containerId,
-            updateEvent.getUpdatedToken().getVersion(),
-            updateEvent.getUpdatedToken().getResource());
-      } catch (IOException e) {
-        LOG.warn("Could not store container [" + containerId + "] resource " +
-            "change..", e);
       }
     }
 
     if (updateEvent.isExecTypeUpdate()) {
-      updateEvent.getContainer().setContainerTokenIdentifier(
-          updateEvent.getUpdatedToken());
-      // If this is a running container.. just change the execution type
-      // and be done with it.
-      if (!runningContainers.containsKey(containerId)) {
-        // Promotion or not (Increase signifies either a promotion
-        // or container size increase)
-        if (updateEvent.isIncrease()) {
-          // Promotion of queued container..
-          if (queuedOpportunisticContainers.remove(containerId) != null) {
-            queuedGuaranteedContainers.put(containerId,
-                updateEvent.getContainer());
-          }
+      // Promotion or not (Increase signifies either a promotion
+      // or container size increase)
+      if (updateEvent.isIncrease()) {
+        // Promotion of queued container..
+        if (queuedOpportunisticContainers.remove(containerId) != null) {
+          queuedGuaranteedContainers.put(containerId,
+              updateEvent.getContainer());
           //Kill/pause opportunistic containers if any to make room for
           // promotion request
           reclaimOpportunisticContainerResources(updateEvent.getContainer());
-        } else {
-          // Demotion of queued container.. Should not happen too often
-          // since you should not find too many queued guaranteed
-          // containers
-          if (queuedGuaranteedContainers.remove(containerId) != null) {
-            queuedOpportunisticContainers.put(containerId,
-                updateEvent.getContainer());
-          }
+        }
+      } else {
+        // Demotion of queued container.. Should not happen too often
+        // since you should not find too many queued guaranteed
+        // containers
+        if (queuedGuaranteedContainers.remove(containerId) != null) {
+          queuedOpportunisticContainers.put(containerId,
+              updateEvent.getContainer());
         }
       }
+      startPendingContainers(maxOppQueueLength <= 0);
     }
   }
 
@@ -290,6 +273,16 @@ public class ContainerScheduler extends AbstractService implements
       queuedGuaranteedContainers.remove(container.getContainerId());
     }
 
+    // Requeue PAUSED containers
+    if (container.getContainerState() == ContainerState.PAUSED) {
+      if (container.getContainerTokenIdentifier().getExecutionType() ==
+          ExecutionType.GUARANTEED) {
+        queuedGuaranteedContainers.put(container.getContainerId(), container);
+      } else {
+        queuedOpportunisticContainers.put(
+            container.getContainerId(), container);
+      }
+    }
     // decrement only if it was a running container
     Container completedContainer = runningContainers.remove(container
         .getContainerId());
@@ -301,7 +294,8 @@ public class ContainerScheduler extends AbstractService implements
           ExecutionType.OPPORTUNISTIC) {
         this.metrics.completeOpportunisticContainer(container.getResource());
       }
-      startPendingContainers(false);
+      boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
+      startPendingContainers(forceStartGuaranteedContainers);
     }
   }
 
@@ -311,26 +305,9 @@ public class ContainerScheduler extends AbstractService implements
    *        container without looking at available resource
    */
   private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
-    // Start pending guaranteed containers, if resources available.
+    // Start guaranteed containers that are paused, if resources available.
     boolean resourcesAvailable = startContainers(
-        queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
-    // Resume opportunistic containers, if resource available.
-    if (resourcesAvailable) {
-      List<Container> pausedContainers = new ArrayList<Container>();
-      Map<ContainerId, Container> containers =
-          context.getContainers();
-      for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
-        ContainerId contId = entry.getKey();
-        // Find containers that were not already started and are in paused state
-        if(false == runningContainers.containsKey(contId)) {
-          if(containers.get(contId).getContainerState()
-              == ContainerState.PAUSED) {
-            pausedContainers.add(containers.get(contId));
-          }
-        }
-      }
-      resourcesAvailable = startContainers(pausedContainers, false);
-    }
+          queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
     // Start opportunistic containers, if resources available.
     if (resourcesAvailable) {
       startContainers(queuedOpportunisticContainers.values(), false);
@@ -590,16 +567,19 @@ public class ContainerScheduler extends AbstractService implements
         queuedOpportunisticContainers.values().iterator();
     while (containerIter.hasNext()) {
       Container container = containerIter.next();
-      if (numAllowed <= 0) {
-        container.sendKillEvent(
-            ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
-            "Container De-queued to meet NM queuing limits.");
-        containerIter.remove();
-        LOG.info(
-            "Opportunistic container {} will be killed to meet NM queuing" +
-                " limits.", container.getContainerId());
+      // Do not shed PAUSED containers
+      if (container.getContainerState() != ContainerState.PAUSED) {
+        if (numAllowed <= 0) {
+          container.sendKillEvent(
+              ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+              "Container De-queued to meet NM queuing limits.");
+          containerIter.remove();
+          LOG.info(
+              "Opportunistic container {} will be killed to meet NM queuing" +
+                  " limits.", container.getContainerId());
+        }
+        numAllowed--;
       }
-      numAllowed--;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
index 5384b7e..2473982 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
@@ -21,33 +21,37 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
     .Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
+
 /**
  * Update Event consumed by the {@link ContainerScheduler}.
  */
 public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
 
-  private ContainerTokenIdentifier updatedToken;
-  private boolean isResourceChange;
-  private boolean isExecTypeUpdate;
-  private boolean isIncrease;
+  private final UpdateContainerTokenEvent containerEvent;
+  private final ContainerTokenIdentifier originalToken;
 
   /**
    * Create instance of Event.
    *
-   * @param originalContainer Original Container.
-   * @param updatedToken Updated Container Token.
-   * @param isResourceChange is this a Resource Change.
-   * @param isExecTypeUpdate is this an ExecTypeUpdate.
-   * @param isIncrease is this a Container Increase.
+   * @param container Container.
+   * @param origToken The Original Container Token.
+   * @param event The Container Event.
+   */
+  public UpdateContainerSchedulerEvent(Container container,
+      ContainerTokenIdentifier origToken, UpdateContainerTokenEvent event) {
+    super(container, ContainerSchedulerEventType.UPDATE_CONTAINER);
+    this.containerEvent = event;
+    this.originalToken = origToken;
+  }
+
+  /**
+   * Original Token before update.
+   *
+   * @return Container Token.
    */
-  public UpdateContainerSchedulerEvent(Container originalContainer,
-      ContainerTokenIdentifier updatedToken, boolean isResourceChange,
-      boolean isExecTypeUpdate, boolean isIncrease) {
-    super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
-    this.updatedToken = updatedToken;
-    this.isResourceChange = isResourceChange;
-    this.isExecTypeUpdate = isExecTypeUpdate;
-    this.isIncrease = isIncrease;
+  public ContainerTokenIdentifier getOriginalToken() {
+    return this.originalToken;
   }
 
   /**
@@ -56,7 +60,7 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
    * @return Container Token.
    */
   public ContainerTokenIdentifier getUpdatedToken() {
-    return updatedToken;
+    return containerEvent.getUpdatedToken();
   }
 
   /**
@@ -64,7 +68,7 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
    * @return isResourceChange.
    */
   public boolean isResourceChange() {
-    return isResourceChange;
+    return containerEvent.isResourceChange();
   }
 
   /**
@@ -72,7 +76,7 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
    * @return isExecTypeUpdate.
    */
   public boolean isExecTypeUpdate() {
-    return isExecTypeUpdate;
+    return containerEvent.isExecTypeUpdate();
   }
 
   /**
@@ -80,6 +84,6 @@ public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
    * @return isIncrease.
    */
   public boolean isIncrease() {
-    return isIncrease;
+    return containerEvent.isIncrease();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 3cafcbd..fc9e6c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -208,6 +208,8 @@ public abstract class BaseContainerManagerTest {
     containerManager.init(conf);
     nodeStatusUpdater.start();
     ((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
+    ((NMContext)context).setContainerStateTransitionListener(
+        new NodeManager.DefaultContainerStateListener());
   }
 
   protected ContainerManagerImpl

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 38df208..6e8c005 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -90,12 +90,16 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@@ -119,6 +123,41 @@ public class TestContainerManager extends BaseContainerManagerTest {
     LOG = LoggerFactory.getLogger(TestContainerManager.class);
   }
 
+  private static class Listener implements ContainerStateTransitionListener {
+
+    private final Map<ContainerId,
+        List<org.apache.hadoop.yarn.server.nodemanager.containermanager.
+            container.ContainerState>> states = new HashMap<>();
+    private final Map<ContainerId, List<ContainerEventType>> events =
+        new HashMap<>();
+
+    @Override
+    public void init(Context context) {}
+
+    @Override
+    public void preTransition(ContainerImpl op,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState beforeState,
+        ContainerEvent eventToBeProcessed) {
+      if (!states.containsKey(op.getContainerId())) {
+        states.put(op.getContainerId(), new ArrayList<>());
+        states.get(op.getContainerId()).add(beforeState);
+        events.put(op.getContainerId(), new ArrayList<>());
+      }
+    }
+
+    @Override
+    public void postTransition(ContainerImpl op,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState beforeState,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState afterState,
+        ContainerEvent processedEvent) {
+      states.get(op.getContainerId()).add(afterState);
+      events.get(op.getContainerId()).add(processedEvent.getType());
+    }
+  }
+
   private boolean delayContainers = false;
 
   @Override
@@ -144,7 +183,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Override
   protected ContainerManagerImpl
       createContainerManager(DeletionService delSrvc) {
-    return new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
+    return  new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
       metrics, dirsHandler) {
 
       @Override
@@ -496,6 +535,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testContainerUpgradeSuccessAutoCommit() throws IOException,
       InterruptedException, YarnException {
+    Listener listener = new Listener();
+    ((NodeManager.DefaultContainerStateListener)containerManager.context.
+        getContainerStateTransitionListener()).addListener(listener);
     testContainerReInitSuccess(true);
     // Should not be able to Commit (since already auto committed)
     try {
@@ -504,6 +546,41 @@ public class TestContainerManager extends BaseContainerManagerTest {
     } catch (Exception e) {
       Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
     }
+
+    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+        ContainerState> containerStates =
+        listener.states.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.NEW,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.LOCALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING), containerStates);
+
+    List<ContainerEventType> containerEventTypes =
+        listener.events.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        ContainerEventType.INIT_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.REINITIALIZE_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
   }
 
   @Test
@@ -524,6 +601,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
   @Test
   public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
       InterruptedException, YarnException {
+    Listener listener = new Listener();
+    ((NodeManager.DefaultContainerStateListener)containerManager.context.
+        getContainerStateTransitionListener()).addListener(listener);
     String[] pids = testContainerReInitSuccess(false);
 
     // Test that the container can be Restarted after the successful upgrrade.
@@ -575,6 +655,67 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     Assert.assertNotEquals("The Rolled-back process should be a different pid",
         pids[0], rolledBackPid);
+
+    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+        ContainerState> containerStates =
+        listener.states.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.NEW,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.LOCALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        // This is the successful restart
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        // This is the rollback
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING), containerStates);
+
+    List<ContainerEventType> containerEventTypes =
+        listener.events.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        ContainerEventType.INIT_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.REINITIALIZE_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.REINITIALIZE_CONTAINER,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.ROLLBACK_REINIT,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
   }
 
   @Test
@@ -584,6 +725,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
       return;
     }
     containerManager.start();
+    Listener listener = new Listener();
+    ((NodeManager.DefaultContainerStateListener)containerManager.context.
+        getContainerStateTransitionListener()).addListener(listener);
     // ////// Construct the Container-id
     ContainerId cId = createContainerId(0);
     File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
@@ -598,6 +742,32 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // since upgrade was terminated..
     Assert.assertTrue("Process is NOT alive!",
         DefaultContainerExecutor.containerIsAlive(pid));
+
+    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+        ContainerState> containerStates =
+        listener.states.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.NEW,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.LOCALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING), containerStates);
+
+    List<ContainerEventType> containerEventTypes =
+        listener.events.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        ContainerEventType.INIT_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.REINITIALIZE_CONTAINER,
+        ContainerEventType.RESOURCE_FAILED), containerEventTypes);
   }
 
   @Test
@@ -632,6 +802,9 @@ public class TestContainerManager extends BaseContainerManagerTest {
       return;
     }
     containerManager.start();
+    Listener listener = new Listener();
+    ((NodeManager.DefaultContainerStateListener)containerManager.context.
+        getContainerStateTransitionListener()).addListener(listener);
     // ////// Construct the Container-id
     ContainerId cId = createContainerId(0);
     File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
@@ -666,6 +839,50 @@ public class TestContainerManager extends BaseContainerManagerTest {
 
     Assert.assertNotEquals("The Rolled-back process should be a different pid",
         pid, rolledBackPid);
+
+    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+        ContainerState> containerStates =
+        listener.states.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.NEW,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.LOCALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.REINITIALIZING_AWAITING_KILL,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING), containerStates);
+
+    List<ContainerEventType> containerEventTypes =
+        listener.events.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        ContainerEventType.INIT_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.REINITIALIZE_CONTAINER,
+        ContainerEventType.RESOURCE_LOCALIZED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
   }
 
   /**
@@ -1582,16 +1799,12 @@ public class TestContainerManager extends BaseContainerManagerTest {
         containerManager.updateContainer(updateRequest);
     // Check response
     Assert.assertEquals(
-        0, updateResponse.getSuccessfullyUpdatedContainers().size());
-    Assert.assertEquals(2, updateResponse.getFailedRequests().size());
+        1, updateResponse.getSuccessfullyUpdatedContainers().size());
+    Assert.assertEquals(1, updateResponse.getFailedRequests().size());
     for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
         .getFailedRequests().entrySet()) {
       Assert.assertNotNull("Failed message", entry.getValue().getMessage());
-      if (cId0.equals(entry.getKey())) {
-        Assert.assertTrue(entry.getValue().getMessage()
-          .contains("Resource can only be changed when a "
-              + "container is in RUNNING or SCHEDULED state"));
-      } else if (cId7.equals(entry.getKey())) {
+      if (cId7.equals(entry.getKey())) {
         Assert.assertTrue(entry.getValue().getMessage()
             .contains("Container " + cId7.toString()
                 + " is not handled by this NodeManager"));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59453dad/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 7c74049..4b380ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -47,11 +48,17 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -76,6 +83,40 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
     LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
   }
 
+  private static class Listener implements ContainerStateTransitionListener {
+
+    private final Map<ContainerId,
+        List<ContainerState>> states = new HashMap<>();
+    private final Map<ContainerId, List<ContainerEventType>> events =
+        new HashMap<>();
+
+    @Override
+    public void init(Context context) {}
+
+    @Override
+    public void preTransition(ContainerImpl op,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState beforeState,
+        ContainerEvent eventToBeProcessed) {
+      if (!states.containsKey(op.getContainerId())) {
+        states.put(op.getContainerId(), new ArrayList<>());
+        states.get(op.getContainerId()).add(beforeState);
+        events.put(op.getContainerId(), new ArrayList<>());
+      }
+    }
+
+    @Override
+    public void postTransition(ContainerImpl op,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState beforeState,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState afterState,
+        ContainerEvent processedEvent) {
+      states.get(op.getContainerId()).add(afterState);
+      events.get(op.getContainerId()).add(processedEvent.getType());
+    }
+  }
+
   private boolean delayContainers = true;
 
   @Override
@@ -542,6 +583,10 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
     containerManager.start();
     containerManager.getContainerScheduler().
         setUsePauseEventForPreemption(true);
+
+    Listener listener = new Listener();
+    ((NodeManager.DefaultContainerStateListener)containerManager.getContext().
+        getContainerStateTransitionListener()).addListener(listener);
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
 
@@ -606,6 +651,39 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
     // starts running
     BaseContainerManagerTest.waitForNMContainerState(containerManager,
         createContainerId(0), ContainerState.DONE, 40);
+
+    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+        ContainerState> containerStates =
+        listener.states.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.NEW,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.PAUSING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.PAUSED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RESUMING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.EXITED_WITH_SUCCESS,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.DONE), containerStates);
+    List<ContainerEventType> containerEventTypes =
+        listener.events.get(createContainerId(0));
+    Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER,
+        ContainerEventType.CONTAINER_LAUNCHED,
+        ContainerEventType.PAUSE_CONTAINER,
+        ContainerEventType.CONTAINER_PAUSED,
+        ContainerEventType.RESUME_CONTAINER,
+        ContainerEventType.CONTAINER_RESUMED,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP), containerEventTypes);
   }
 
   /**
@@ -1068,6 +1146,9 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
   @Test
   public void testPromotionOfOpportunisticContainers() throws Exception {
     containerManager.start();
+    Listener listener = new Listener();
+    ((NodeManager.DefaultContainerStateListener)containerManager.getContext().
+        getContainerStateTransitionListener()).addListener(listener);
 
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
@@ -1150,6 +1231,7 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
     containerStatuses = containerManager
         .getContainerStatuses(statRequest).getContainerStatuses();
     Assert.assertEquals(1, containerStatuses.size());
+
     for (ContainerStatus status : containerStatuses) {
       if (org.apache.hadoop.yarn.api.records.ContainerState.RUNNING ==
           status.getState()) {
@@ -1160,6 +1242,25 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
 
     // Ensure no containers are queued.
     Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
+
+    List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+        ContainerState> containerStates =
+        listener.states.get(createContainerId(1));
+    Assert.assertEquals(Arrays.asList(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.NEW,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
+            ContainerState.RUNNING), containerStates);
+    List<ContainerEventType> containerEventTypes =
+        listener.events.get(createContainerId(1));
+    Assert.assertEquals(Arrays.asList(
+        ContainerEventType.INIT_CONTAINER,
+        ContainerEventType.UPDATE_CONTAINER_TOKEN,
+        ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/4] hadoop git commit: YARN-5216. Expose configurable preemption policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via asuresh)

Posted by as...@apache.org.
YARN-5216. Expose configurable preemption policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via asuresh)

(cherry picked from commit 4f8194430fc6a69d9cc99b78828fd7045d5683e8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8eebee50
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8eebee50
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8eebee50

Branch: refs/heads/branch-3.0
Commit: 8eebee5094e9112920793e45a08b78c464bc7787
Parents: dd51a74
Author: Arun Suresh <as...@apache.org>
Authored: Sat Dec 24 17:16:52 2016 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Thu Sep 28 11:24:20 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   9 ++
 .../src/main/resources/yarn-default.xml         |   9 ++
 .../containermanager/container/Container.java   |   2 +
 .../container/ContainerImpl.java                |  32 ++++--
 .../scheduler/ContainerScheduler.java           |  84 ++++++++++++---
 .../TestContainerSchedulerQueuing.java          | 103 +++++++++++++++++++
 .../nodemanager/webapp/MockContainer.java       |   5 +
 7 files changed, 218 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 572bab9..9985458 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1034,6 +1034,15 @@ public class YarnConfiguration extends Configuration {
       NM_PREFIX + "container-retry-minimum-interval-ms";
   public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000;
 
+  /**
+   * Use container pause as the preemption policy over kill in the container
+   * queue at a NodeManager.
+   **/
+  public static final String NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION =
+      NM_PREFIX + "opportunistic-containers-use-pause-for-preemption";
+  public static final boolean
+      DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = false;
+
   /** Interval at which the delayed token removal thread runs */
   public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
       RM_PREFIX + "delayed.delegation-token.removal-interval-ms";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index cf17dfc..4578f5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3028,6 +3028,15 @@
 
   <property>
     <description>
+    Use container pause as the preemption policy over kill in the container
+    queue at a NodeManager.
+    </description>
+    <name>yarn.nodemanager.opportunistic-containers-use-pause-for-preemption</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
     Error filename pattern, to identify the file in the container's
     Log directory which contain the container's error log. As error file
     redirection is done by client/AM and yarn will not be aware of the error

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index ac9fbb7..ae83b88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -96,4 +96,6 @@ public interface Container extends EventHandler<ContainerEvent> {
   void sendKillEvent(int exitStatus, String description);
 
   boolean isRecovering();
+
+  void sendPauseEvent(String description);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 6f90cdf..9e7a664 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -814,15 +814,22 @@ public class ContainerImpl implements Container {
   @SuppressWarnings("unchecked") // dispatcher not typed
   @Override
   public void sendLaunchEvent() {
-    ContainersLauncherEventType launcherEvent =
-        ContainersLauncherEventType.LAUNCH_CONTAINER;
-    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
-      // try to recover a container that was previously launched
-      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+    if (ContainerState.PAUSED == getContainerState()) {
+      dispatcher.getEventHandler().handle(
+          new ContainerResumeEvent(containerId,
+              "Container Resumed as some resources freed up"));
+    } else {
+      ContainersLauncherEventType launcherEvent =
+          ContainersLauncherEventType.LAUNCH_CONTAINER;
+      if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+        // try to recover a container that was previously launched
+        launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+      }
+      containerLaunchStartTime = clock.getTime();
+      dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(this, launcherEvent));
     }
-    containerLaunchStartTime = clock.getTime();
-    dispatcher.getEventHandler().handle(
-        new ContainersLauncherEvent(this, launcherEvent));
+
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
@@ -842,6 +849,13 @@ public class ContainerImpl implements Container {
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
+  @Override
+  public void sendPauseEvent(String description) {
+    dispatcher.getEventHandler().handle(
+        new ContainerPauseEvent(containerId, description));
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
   private void sendRelaunchEvent() {
     ContainersLauncherEventType launcherEvent =
         ContainersLauncherEventType.RELAUNCH_CONTAINER;
@@ -1797,7 +1811,7 @@ public class ContainerImpl implements Container {
 
   /**
    * Transitions upon receiving PAUSE_CONTAINER.
-   * - RUNNING -> PAUSED
+   * - RUNNING -> PAUSING
    */
   @SuppressWarnings("unchecked") // dispatcher not typed
   static class PauseContainerTransition implements

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index 7780f9f..830a06d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
     .ChangeMonitoringContainerResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 
 
@@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements
       queuedOpportunisticContainers = new LinkedHashMap<>();
 
   // Used to keep track of containers that have been marked to be killed
-  // to make room for a guaranteed container.
+  // or paused to make room for a guaranteed container.
   private final Map<ContainerId, Container> oppContainersToKill =
       new HashMap<>();
 
@@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements
   private final AsyncDispatcher dispatcher;
   private final NodeManagerMetrics metrics;
 
+  private Boolean usePauseEventForPreemption = false;
+
   /**
    * Instantiate a Container Scheduler.
    * @param context NodeManager Context.
@@ -112,6 +116,17 @@ public class ContainerScheduler extends AbstractService implements
             DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
   }
 
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    this.usePauseEventForPreemption =
+        conf.getBoolean(
+            YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
+            YarnConfiguration.
+                DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
+  }
+
   @VisibleForTesting
   public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
       NodeManagerMetrics metrics, int qLength) {
@@ -136,8 +151,9 @@ public class ContainerScheduler extends AbstractService implements
     case SCHEDULE_CONTAINER:
       scheduleContainer(event.getContainer());
       break;
+    case CONTAINER_PAUSED:
     case CONTAINER_COMPLETED:
-      onContainerCompleted(event.getContainer());
+      onResourcesReclaimed(event.getContainer());
       break;
     case UPDATE_CONTAINER:
       if (event instanceof UpdateContainerSchedulerEvent) {
@@ -203,9 +219,9 @@ public class ContainerScheduler extends AbstractService implements
             queuedGuaranteedContainers.put(containerId,
                 updateEvent.getContainer());
           }
-          //Kill opportunistic containers if any to make room for
+          //Kill/pause opportunistic containers if any to make room for
           // promotion request
-          killOpportunisticContainers(updateEvent.getContainer());
+          reclaimOpportunisticContainerResources(updateEvent.getContainer());
         } else {
           // Demotion of queued container.. Should not happen too often
           // since you should not find too many queued guaranteed
@@ -243,6 +259,12 @@ public class ContainerScheduler extends AbstractService implements
     return this.runningContainers.size();
   }
 
+  @VisibleForTesting
+  public void setUsePauseEventForPreemption(
+      boolean usePauseEventForPreemption) {
+    this.usePauseEventForPreemption = usePauseEventForPreemption;
+  }
+
   public OpportunisticContainersStatus getOpportunisticContainersStatus() {
     this.opportunisticContainersStatus.setQueuedOpportContainers(
         getNumQueuedOpportunisticContainers());
@@ -257,7 +279,7 @@ public class ContainerScheduler extends AbstractService implements
     return this.opportunisticContainersStatus;
   }
 
-  private void onContainerCompleted(Container container) {
+  private void onResourcesReclaimed(Container container) {
     oppContainersToKill.remove(container.getContainerId());
 
     // This could be killed externally for eg. by the ContainerManager,
@@ -292,6 +314,23 @@ public class ContainerScheduler extends AbstractService implements
     // Start pending guaranteed containers, if resources available.
     boolean resourcesAvailable = startContainers(
         queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+    // Resume opportunistic containers, if resource available.
+    if (resourcesAvailable) {
+      List<Container> pausedContainers = new ArrayList<Container>();
+      Map<ContainerId, Container> containers =
+          context.getContainers();
+      for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
+        ContainerId contId = entry.getKey();
+        // Find containers that were not already started and are in paused state
+        if(false == runningContainers.containsKey(contId)) {
+          if(containers.get(contId).getContainerState()
+              == ContainerState.PAUSED) {
+            pausedContainers.add(containers.get(contId));
+          }
+        }
+      }
+      resourcesAvailable = startContainers(pausedContainers, false);
+    }
     // Start opportunistic containers, if resources available.
     if (resourcesAvailable) {
       startContainers(queuedOpportunisticContainers.values(), false);
@@ -395,7 +434,7 @@ public class ContainerScheduler extends AbstractService implements
       // if the guaranteed container is queued, we need to preempt opportunistic
       // containers for make room for it
       if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
-        killOpportunisticContainers(container);
+        reclaimOpportunisticContainerResources(container);
       }
     } else {
       // Given an opportunistic container, we first try to start as many queuing
@@ -413,19 +452,30 @@ public class ContainerScheduler extends AbstractService implements
     }
   }
 
-  private void killOpportunisticContainers(Container container) {
-    List<Container> extraOpportContainersToKill =
-        pickOpportunisticContainersToKill(container.getContainerId());
+  @SuppressWarnings("unchecked")
+  private void reclaimOpportunisticContainerResources(Container container) {
+    List<Container> extraOppContainersToReclaim =
+        pickOpportunisticContainersToReclaimResources(
+            container.getContainerId());
     // Kill the opportunistic containers that were chosen.
-    for (Container contToKill : extraOpportContainersToKill) {
-      contToKill.sendKillEvent(
-          ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
-          "Container Killed to make room for Guaranteed Container.");
-      oppContainersToKill.put(contToKill.getContainerId(), contToKill);
+    for (Container contToReclaim : extraOppContainersToReclaim) {
+      String preemptionAction = usePauseEventForPreemption == true ? "paused" :
+          "resumed";
       LOG.info(
-          "Opportunistic container {} will be killed in order to start the "
+          "Container {} will be {} to start the "
               + "execution of guaranteed container {}.",
-          contToKill.getContainerId(), container.getContainerId());
+          contToReclaim.getContainerId(), preemptionAction,
+          container.getContainerId());
+
+      if (usePauseEventForPreemption) {
+        contToReclaim.sendPauseEvent(
+            "Container Paused to make room for Guaranteed Container");
+      } else {
+        contToReclaim.sendKillEvent(
+            ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+            "Container Killed to make room for Guaranteed Container.");
+      }
+      oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
     }
   }
 
@@ -440,7 +490,7 @@ public class ContainerScheduler extends AbstractService implements
     container.sendLaunchEvent();
   }
 
-  private List<Container> pickOpportunisticContainersToKill(
+  private List<Container> pickOpportunisticContainersToReclaimResources(
       ContainerId containerToStartId) {
     // The opportunistic containers that need to be killed for the
     // given container to start.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 7dfb05f..7c74049 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -124,18 +127,38 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
   @Override
   protected ContainerExecutor createContainerExecutor() {
     DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+      ConcurrentMap<String, Boolean> oversleepMap =
+          new ConcurrentHashMap<String, Boolean>();
       @Override
       public int launchContainer(ContainerStartContext ctx)
           throws IOException, ConfigurationException {
+        oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
         if (delayContainers) {
           try {
             Thread.sleep(10000);
+            if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
+                == true) {
+              Thread.sleep(10000);
+            }
           } catch (InterruptedException e) {
             // Nothing..
           }
         }
         return super.launchContainer(ctx);
       }
+
+      @Override
+      public void pauseContainer(Container container) {
+        // To mimic pausing we force the container to be in the PAUSED state
+        // a little longer by oversleeping.
+        oversleepMap.put(container.getContainerId().toString(), true);
+        LOG.info("Container was paused");
+      }
+
+      @Override
+      public void resumeContainer(Container container) {
+        LOG.info("Container was resumed");
+      }
     };
     exec.setConf(conf);
     return spy(exec);
@@ -506,6 +529,86 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
   }
 
   /**
+   * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
+   * requests by each container as such that only one can run in parallel.
+   * Thus, the OPPORTUNISTIC container that started running, will be
+   * paused for the GUARANTEED container to start.
+   * Once the GUARANTEED container finishes its execution, the remaining
+   * OPPORTUNISTIC container will be executed.
+   * @throws Exception
+   */
+  @Test
+  public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
+    containerManager.start();
+    containerManager.getContainerScheduler().
+        setUsePauseEventForPreemption(true);
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(0), ContainerState.RUNNING, 40);
+
+    list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED)));
+    allRequests =
+        StartContainersRequest.newInstance(list);
+
+    containerManager.startContainers(allRequests);
+
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(1), ContainerState.RUNNING, 40);
+
+    // Get container statuses. Container 0 should be paused, container 1
+    // should be running.
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 2; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertTrue(status.getDiagnostics().contains(
+            "Container Paused to make room for Guaranteed Container"));
+      } else if (status.getContainerId().equals(createContainerId(1))) {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+            status.getState());
+      }
+      System.out.println("\nStatus : [" + status + "]\n");
+    }
+
+    // Make sure that the GUARANTEED container completes
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(1), ContainerState.DONE, 40);
+    // Make sure that the PAUSED opportunistic container resumes and
+    // starts running
+    BaseContainerManagerTest.waitForNMContainerState(containerManager,
+        createContainerId(0), ContainerState.DONE, 40);
+  }
+
+  /**
    * 1. Submit a long running GUARANTEED container to hog all NM resources.
    * 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
    * 3. Update the Queue Limit to 2.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8eebee50/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 57bee8c..8ad6d7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -239,4 +239,9 @@ public class MockContainer implements Container {
   public long getContainerStartTime() {
     return 0;
   }
+
+  @Override
+  public void sendPauseEvent(String description) {
+
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org