You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [6/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Tue Aug 19 23:49:39 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -66,4 +67,10 @@ public interface Context {
   LocalDirsHandlerService getLocalDirsHandler();
 
   ApplicationACLsManager getApplicationACLsManager();
+
+  NMStateStoreService getNMStateStore();
+
+  boolean getDecommissioned();
+
+  void setDecommissioned(boolean isDecommissioned);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import com.google.common.base.Optional;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
@@ -212,10 +213,21 @@ public class DefaultContainerExecutor ex
           && exitCode != ExitCode.TERMINATED.getExitCode()) {
         LOG.warn("Exception from container-launch with container ID: "
             + containerId + " and exit code: " + exitCode , e);
-        logOutput(shExec.getOutput());
-        String diagnostics = "Exception from container-launch: "
-            + e + "\n"
-            + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
+
+        StringBuilder builder = new StringBuilder();
+        builder.append("Exception from container-launch.\n");
+        builder.append("Container id: " + containerId + "\n");
+        builder.append("Exit code: " + exitCode + "\n");
+        if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
+          builder.append("Exception message: " + e.getMessage() + "\n");
+        }
+        builder.append("Stack trace: "
+            + StringUtils.stringifyException(e) + "\n");
+        if (!shExec.getOutput().isEmpty()) {
+          builder.append("Shell output: " + shExec.getOutput() + "\n");
+        }
+        String diagnostics = builder.toString();
+        logOutput(diagnostics);
         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
             diagnostics));
       } else {
@@ -261,25 +273,57 @@ public class DefaultContainerExecutor ex
 
   private final class UnixLocalWrapperScriptBuilder
       extends LocalWrapperScriptBuilder {
+    private final Path sessionScriptPath;
 
     public UnixLocalWrapperScriptBuilder(Path containerWorkDir) {
       super(containerWorkDir);
+      this.sessionScriptPath = new Path(containerWorkDir,
+          Shell.appendScriptExtension("default_container_executor_session"));
+    }
+
+    @Override
+    public void writeLocalWrapperScript(Path launchDst, Path pidFile)
+        throws IOException {
+      writeSessionScript(launchDst, pidFile);
+      super.writeLocalWrapperScript(launchDst, pidFile);
     }
 
     @Override
     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
         PrintStream pout) {
-
-      // We need to do a move as writing to a file is not atomic
-      // Process reading a file being written to may get garbled data
-      // hence write pid to tmp file first followed by a mv
+      String exitCodeFile = ContainerLaunch.getExitCodeFile(
+          pidFile.toString());
+      String tmpFile = exitCodeFile + ".tmp";
       pout.println("#!/bin/bash");
-      pout.println();
-      pout.println("echo $$ > " + pidFile.toString() + ".tmp");
-      pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
-      String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
-      pout.println(exec + " /bin/bash \"" +
-        launchDst.toUri().getPath().toString() + "\"");
+      pout.println("/bin/bash \"" + sessionScriptPath.toString() + "\"");
+      pout.println("rc=$?");
+      pout.println("echo $rc > \"" + tmpFile + "\"");
+      pout.println("/bin/mv -f \"" + tmpFile + "\" \"" + exitCodeFile + "\"");
+      pout.println("exit $rc");
+    }
+
+    private void writeSessionScript(Path launchDst, Path pidFile)
+        throws IOException {
+      DataOutputStream out = null;
+      PrintStream pout = null;
+      try {
+        out = lfs.create(sessionScriptPath, EnumSet.of(CREATE, OVERWRITE));
+        pout = new PrintStream(out);
+        // We need to do a move as writing to a file is not atomic
+        // Process reading a file being written to may get garbled data
+        // hence write pid to tmp file first followed by a mv
+        pout.println("#!/bin/bash");
+        pout.println();
+        pout.println("echo $$ > " + pidFile.toString() + ".tmp");
+        pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
+        String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
+        pout.println(exec + " /bin/bash \"" +
+            launchDst.toUri().getPath().toString() + "\"");
+      } finally {
+        IOUtils.cleanup(LOG, pout, out);
+      }
+      lfs.setPermission(sessionScriptPath,
+          ContainerExecutor.TASK_LAUNCH_SCRIPT_PERMISSION);
     }
   }
 
@@ -298,6 +342,7 @@ public class DefaultContainerExecutor ex
     @Override
     public void writeLocalWrapperScript(Path launchDst, Path pidFile,
         PrintStream pout) {
+      // TODO: exit code script for Windows
 
       // On Windows, the pid is the container ID, so that it can also serve as
       // the name of the job object created by winutils for task management.
@@ -330,6 +375,12 @@ public class DefaultContainerExecutor ex
     return true;
   }
 
+  @Override
+  public boolean isContainerProcessAlive(String user, String pid)
+      throws IOException {
+    return containerIsAlive(pid);
+  }
+
   /**
    * Returns true if the process with the specified pid is alive.
    * 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java Tue Aug 19 23:49:39 2014
@@ -21,10 +21,13 @@ package org.apache.hadoop.yarn.server.no
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -40,6 +43,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredDeletionServiceState;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,6 +57,8 @@ public class DeletionService extends Abs
   private final ContainerExecutor exec;
   private ScheduledThreadPoolExecutor sched;
   private static final FileContext lfs = getLfs();
+  private final NMStateStoreService stateStore;
+  private AtomicInteger nextTaskId = new AtomicInteger(0);
 
   static final FileContext getLfs() {
     try {
@@ -60,14 +69,18 @@ public class DeletionService extends Abs
   }
 
   public DeletionService(ContainerExecutor exec) {
+    this(exec, new NMNullStateStoreService());
+  }
+
+  public DeletionService(ContainerExecutor exec,
+      NMStateStoreService stateStore) {
     super(DeletionService.class.getName());
     this.exec = exec;
     this.debugDelay = 0;
+    this.stateStore = stateStore;
   }
   
   /**
-   * 
-  /**
    * Delete the path(s) as this user.
    * @param user The user to delete as, or the JVM user if null
    * @param subDir the sub directory name
@@ -76,19 +89,20 @@ public class DeletionService extends Abs
   public void delete(String user, Path subDir, Path... baseDirs) {
     // TODO if parent owned by NM, rename within parent inline
     if (debugDelay != -1) {
-      if (baseDirs == null || baseDirs.length == 0) {
-        sched.schedule(new FileDeletionTask(this, user, subDir, null),
-          debugDelay, TimeUnit.SECONDS);
-      } else {
-        sched.schedule(
-          new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs)),
-          debugDelay, TimeUnit.SECONDS);
+      List<Path> baseDirList = null;
+      if (baseDirs != null && baseDirs.length != 0) {
+        baseDirList = Arrays.asList(baseDirs);
       }
+      FileDeletionTask task =
+          new FileDeletionTask(this, user, subDir, baseDirList);
+      recordDeletionTaskInStateStore(task);
+      sched.schedule(task, debugDelay, TimeUnit.SECONDS);
     }
   }
   
   public void scheduleFileDeletionTask(FileDeletionTask fileDeletionTask) {
     if (debugDelay != -1) {
+      recordDeletionTaskInStateStore(fileDeletionTask);
       sched.schedule(fileDeletionTask, debugDelay, TimeUnit.SECONDS);
     }
   }
@@ -109,6 +123,9 @@ public class DeletionService extends Abs
     }
     sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
     sched.setKeepAliveTime(60L, SECONDS);
+    if (stateStore.canRecover()) {
+      recover(stateStore.loadDeletionServiceState());
+    }
     super.serviceInit(conf);
   }
 
@@ -139,6 +156,8 @@ public class DeletionService extends Abs
   }
 
   public static class FileDeletionTask implements Runnable {
+    public static final int INVALID_TASK_ID = -1;
+    private int taskId;
     private final String user;
     private final Path subDir;
     private final List<Path> baseDirs;
@@ -152,6 +171,12 @@ public class DeletionService extends Abs
     
     private FileDeletionTask(DeletionService delService, String user,
         Path subDir, List<Path> baseDirs) {
+      this(INVALID_TASK_ID, delService, user, subDir, baseDirs);
+    }
+
+    private FileDeletionTask(int taskId, DeletionService delService,
+        String user, Path subDir, List<Path> baseDirs) {
+      this.taskId = taskId;
       this.delService = delService;
       this.user = user;
       this.subDir = subDir;
@@ -198,6 +223,12 @@ public class DeletionService extends Abs
       return this.success;
     }
     
+    public synchronized FileDeletionTask[] getSuccessorTasks() {
+      FileDeletionTask[] successors =
+          new FileDeletionTask[successorTaskSet.size()];
+      return successorTaskSet.toArray(successors);
+    }
+
     @Override
     public void run() {
       if (LOG.isDebugEnabled()) {
@@ -286,6 +317,12 @@ public class DeletionService extends Abs
      * dependent tasks of it has failed marking its success = false.  
      */
     private synchronized void fileDeletionTaskFinished() {
+      try {
+        delService.stateStore.removeDeletionTask(taskId);
+      } catch (IOException e) {
+        LOG.error("Unable to remove deletion task " + taskId
+            + " from state store", e);
+      }
       Iterator<FileDeletionTask> successorTaskI =
           this.successorTaskSet.iterator();
       while (successorTaskI.hasNext()) {
@@ -318,4 +355,129 @@ public class DeletionService extends Abs
       Path[] baseDirs) {
     return new FileDeletionTask(this, user, subDir, Arrays.asList(baseDirs));
   }
+
+  private void recover(RecoveredDeletionServiceState state)
+      throws IOException {
+    List<DeletionServiceDeleteTaskProto> taskProtos = state.getTasks();
+    Map<Integer, DeletionTaskRecoveryInfo> idToInfoMap =
+        new HashMap<Integer, DeletionTaskRecoveryInfo>(taskProtos.size());
+    Set<Integer> successorTasks = new HashSet<Integer>();
+    for (DeletionServiceDeleteTaskProto proto : taskProtos) {
+      DeletionTaskRecoveryInfo info = parseTaskProto(proto);
+      idToInfoMap.put(info.task.taskId, info);
+      nextTaskId.set(Math.max(nextTaskId.get(), info.task.taskId));
+      successorTasks.addAll(info.successorTaskIds);
+    }
+
+    // restore the task dependencies and schedule the deletion tasks that
+    // have no predecessors
+    final long now = System.currentTimeMillis();
+    for (DeletionTaskRecoveryInfo info : idToInfoMap.values()) {
+      for (Integer successorId : info.successorTaskIds){
+        DeletionTaskRecoveryInfo successor = idToInfoMap.get(successorId);
+        if (successor != null) {
+          info.task.addFileDeletionTaskDependency(successor.task);
+        } else {
+          LOG.error("Unable to locate dependency task for deletion task "
+              + info.task.taskId + " at " + info.task.getSubDir());
+        }
+      }
+      if (!successorTasks.contains(info.task.taskId)) {
+        long msecTilDeletion = info.deletionTimestamp - now;
+        sched.schedule(info.task, msecTilDeletion, TimeUnit.MILLISECONDS);
+      }
+    }
+  }
+
+  private DeletionTaskRecoveryInfo parseTaskProto(
+      DeletionServiceDeleteTaskProto proto) throws IOException {
+    int taskId = proto.getId();
+    String user = proto.hasUser() ? proto.getUser() : null;
+    Path subdir = null;
+    List<Path> basePaths = null;
+    if (proto.hasSubdir()) {
+      subdir = new Path(proto.getSubdir());
+    }
+    List<String> basedirs = proto.getBasedirsList();
+    if (basedirs != null && basedirs.size() > 0) {
+      basePaths = new ArrayList<Path>(basedirs.size());
+      for (String basedir : basedirs) {
+        basePaths.add(new Path(basedir));
+      }
+    }
+
+    FileDeletionTask task = new FileDeletionTask(taskId, this, user,
+        subdir, basePaths);
+    return new DeletionTaskRecoveryInfo(task,
+        proto.getSuccessorIdsList(),
+        proto.getDeletionTime());
+  }
+
+  private int generateTaskId() {
+    // get the next ID but avoid an invalid ID
+    int taskId = nextTaskId.incrementAndGet();
+    while (taskId == FileDeletionTask.INVALID_TASK_ID) {
+      taskId = nextTaskId.incrementAndGet();
+    }
+    return taskId;
+  }
+
+  private void recordDeletionTaskInStateStore(FileDeletionTask task) {
+    if (!stateStore.canRecover()) {
+      // optimize the case where we aren't really recording
+      return;
+    }
+    if (task.taskId != FileDeletionTask.INVALID_TASK_ID) {
+      return;  // task already recorded
+    }
+
+    task.taskId = generateTaskId();
+
+    FileDeletionTask[] successors = task.getSuccessorTasks();
+
+    // store successors first to ensure task IDs have been generated for them
+    for (FileDeletionTask successor : successors) {
+      recordDeletionTaskInStateStore(successor);
+    }
+
+    DeletionServiceDeleteTaskProto.Builder builder =
+        DeletionServiceDeleteTaskProto.newBuilder();
+    builder.setId(task.taskId);
+    if (task.getUser() != null) {
+      builder.setUser(task.getUser());
+    }
+    if (task.getSubDir() != null) {
+      builder.setSubdir(task.getSubDir().toString());
+    }
+    builder.setDeletionTime(System.currentTimeMillis() +
+        TimeUnit.MILLISECONDS.convert(debugDelay, TimeUnit.SECONDS));
+    if (task.getBaseDirs() != null) {
+      for (Path dir : task.getBaseDirs()) {
+        builder.addBasedirs(dir.toString());
+      }
+    }
+    for (FileDeletionTask successor : successors) {
+      builder.addSuccessorIds(successor.taskId);
+    }
+
+    try {
+      stateStore.storeDeletionTask(task.taskId, builder.build());
+    } catch (IOException e) {
+      LOG.error("Unable to store deletion task " + task.taskId + " for "
+          + task.getSubDir(), e);
+    }
+  }
+
+  private static class DeletionTaskRecoveryInfo {
+    FileDeletionTask task;
+    List<Integer> successorTaskIds;
+    long deletionTimestamp;
+
+    public DeletionTaskRecoveryInfo(FileDeletionTask task,
+        List<Integer> successorTaskIds, long deletionTimestamp) {
+      this.task = task;
+      this.successorTaskIds = successorTaskIds;
+      this.deletionTimestamp = deletionTimestamp;
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import com.google.common.base.Optional;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -296,9 +297,21 @@ public class LinuxContainerExecutor exte
           && exitCode != ExitCode.TERMINATED.getExitCode()) {
         LOG.warn("Exception from container-launch with container ID: "
             + containerId + " and exit code: " + exitCode , e);
-        logOutput(shExec.getOutput());
-        String diagnostics = "Exception from container-launch: \n"
-            + StringUtils.stringifyException(e) + "\n" + shExec.getOutput();
+
+        StringBuilder builder = new StringBuilder();
+        builder.append("Exception from container-launch.\n");
+        builder.append("Container id: " + containerId + "\n");
+        builder.append("Exit code: " + exitCode + "\n");
+        if (!Optional.fromNullable(e.getMessage()).or("").isEmpty()) {
+          builder.append("Exception message: " + e.getMessage() + "\n");
+        }
+        builder.append("Stack trace: "
+            + StringUtils.stringifyException(e) + "\n");
+        if (!shExec.getOutput().isEmpty()) {
+          builder.append("Shell output: " + shExec.getOutput() + "\n");
+        }
+        String diagnostics = builder.toString();
+        logOutput(diagnostics);
         container.handle(new ContainerDiagnosticsUpdateEvent(containerId,
             diagnostics));
       } else {
@@ -390,6 +403,13 @@ public class LinuxContainerExecutor exte
     }
   }
   
+  @Override
+  public boolean isContainerProcessAlive(String user, String pid)
+      throws IOException {
+    // Send a test signal to the process as the user to see if it's alive
+    return signalContainer(user, pid, Signal.NULL);
+  }
+
   public void mountCgroups(List<String> cgroupKVs, String hierarchy)
          throws IOException {
     List<String> command = new ArrayList<String>(

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Aug 19 23:49:39 2014
@@ -53,6 +53,9 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
@@ -78,9 +81,11 @@ public class NodeManager extends Composi
   private ContainerManagerImpl containerManager;
   private NodeStatusUpdater nodeStatusUpdater;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook; 
+  private NMStateStoreService nmStore = null;
   
   private AtomicBoolean isStopping = new AtomicBoolean(false);
-  
+  private boolean rmWorkPreservingRestartEnabled;
+
   public NodeManager() {
     super(NodeManager.class.getName());
   }
@@ -110,14 +115,15 @@ public class NodeManager extends Composi
   }
 
   protected DeletionService createDeletionService(ContainerExecutor exec) {
-    return new DeletionService(exec);
+    return new DeletionService(exec, nmStore);
   }
 
   protected NMContext createNMContext(
       NMContainerTokenSecretManager containerTokenSecretManager,
-      NMTokenSecretManagerInNM nmTokenSecretManager) {
+      NMTokenSecretManagerInNM nmTokenSecretManager,
+      NMStateStoreService stateStore) {
     return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
-        dirsHandler, aclsManager);
+        dirsHandler, aclsManager, stateStore);
   }
 
   protected void doSecureLogin() throws IOException {
@@ -125,11 +131,8 @@ public class NodeManager extends Composi
         YarnConfiguration.NM_PRINCIPAL);
   }
 
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-
-    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
-
+  private void initAndStartRecoveryStore(Configuration conf)
+      throws IOException {
     boolean recoveryEnabled = conf.getBoolean(
         YarnConfiguration.NM_RECOVERY_ENABLED,
         YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED);
@@ -142,13 +145,57 @@ public class NodeManager extends Composi
       }
       Path recoveryRoot = new Path(recoveryDirName);
       recoveryFs.mkdirs(recoveryRoot, new FsPermission((short)0700));
+      nmStore = new NMLeveldbStateStoreService();
+    } else {
+      nmStore = new NMNullStateStoreService();
+    }
+    nmStore.init(conf);
+    nmStore.start();
+  }
+
+  private void stopRecoveryStore() throws IOException {
+    nmStore.stop();
+    if (context.getDecommissioned() && nmStore.canRecover()) {
+      LOG.info("Removing state store due to decommission");
+      Configuration conf = getConfig();
+      Path recoveryRoot = new Path(
+          conf.get(YarnConfiguration.NM_RECOVERY_DIR));
+      LOG.info("Removing state store at " + recoveryRoot
+          + " due to decommission");
+      FileSystem recoveryFs = FileSystem.getLocal(conf);
+      if (!recoveryFs.delete(recoveryRoot, true)) {
+        LOG.warn("Unable to delete " + recoveryRoot);
+      }
+    }
+  }
+
+  private void recoverTokens(NMTokenSecretManagerInNM nmTokenSecretManager,
+      NMContainerTokenSecretManager containerTokenSecretManager)
+          throws IOException {
+    if (nmStore.canRecover()) {
+      nmTokenSecretManager.recover();
+      containerTokenSecretManager.recover();
     }
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+    rmWorkPreservingRestartEnabled = conf.getBoolean(YarnConfiguration
+            .RM_WORK_PRESERVING_RECOVERY_ENABLED,
+        YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED);
+
+    initAndStartRecoveryStore(conf);
 
     NMContainerTokenSecretManager containerTokenSecretManager =
-        new NMContainerTokenSecretManager(conf);
+        new NMContainerTokenSecretManager(conf, nmStore);
 
     NMTokenSecretManagerInNM nmTokenSecretManager =
-        new NMTokenSecretManagerInNM();
+        new NMTokenSecretManagerInNM(nmStore);
+
+    recoverTokens(nmTokenSecretManager, containerTokenSecretManager);
     
     this.aclsManager = new ApplicationACLsManager(conf);
 
@@ -171,7 +218,7 @@ public class NodeManager extends Composi
     dirsHandler = nodeHealthChecker.getDiskHandler();
 
     this.context = createNMContext(containerTokenSecretManager,
-        nmTokenSecretManager);
+        nmTokenSecretManager, nmStore);
     
     nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
@@ -220,6 +267,7 @@ public class NodeManager extends Composi
       return;
     }
     super.serviceStop();
+    stopRecoveryStore();
     DefaultMetricsSystem.shutdown();
   }
 
@@ -244,8 +292,12 @@ public class NodeManager extends Composi
         try {
           LOG.info("Notifying ContainerManager to block new container-requests");
           containerManager.setBlockNewContainerRequests(true);
-          LOG.info("Cleaning up running containers on resync");
-          containerManager.cleanupContainersOnNMResync();
+          if (!rmWorkPreservingRestartEnabled) {
+            LOG.info("Cleaning up running containers on resync");
+            containerManager.cleanupContainersOnNMResync();
+          } else {
+            LOG.info("Preserving containers on resync");
+          }
           ((NodeStatusUpdaterImpl) nodeStatusUpdater)
             .rebootNodeStatusUpdaterAndRegisterWithRM();
         } catch (YarnRuntimeException e) {
@@ -272,10 +324,13 @@ public class NodeManager extends Composi
     private WebServer webServer;
     private final NodeHealthStatus nodeHealthStatus = RecordFactoryProvider
         .getRecordFactory(null).newRecordInstance(NodeHealthStatus.class);
-        
+    private final NMStateStoreService stateStore;
+    private boolean isDecommissioned = false;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
-        LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager) {
+        LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
+        NMStateStoreService stateStore) {
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -283,6 +338,7 @@ public class NodeManager extends Composi
       this.nodeHealthStatus.setIsNodeHealthy(true);
       this.nodeHealthStatus.setHealthReport("Healthy");
       this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
+      this.stateStore = stateStore;
     }
 
     /**
@@ -349,6 +405,21 @@ public class NodeManager extends Composi
     public ApplicationACLsManager getApplicationACLsManager() {
       return aclsManager;
     }
+
+    @Override
+    public NMStateStoreService getNMStateStore() {
+      return stateStore;
+    }
+
+    @Override
+    public boolean getDecommissioned() {
+      return isDecommissioned;
+    }
+
+    @Override
+    public void setDecommissioned(boolean isDecommissioned) {
+      this.isDecommissioned = isDecommissioned;
+    }
   }
 
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Aug 19 23:49:39 2014
@@ -23,11 +23,34 @@ import org.apache.hadoop.yarn.api.record
 
 public interface NodeStatusUpdater extends Service {
 
+  /**
+   * Schedule a heartbeat to the ResourceManager outside of the normal,
+   * periodic heartbeating process. This is typically called when the state
+   * of containers on the node has changed to notify the RM sooner.
+   */
   void sendOutofBandHeartBeat();
 
+  /**
+   * Get the ResourceManager identifier received during registration
+   * @return the ResourceManager ID
+   */
   long getRMIdentifier();
   
+  /**
+   * Query if a container has recently completed
+   * @param containerId the container ID
+   * @return true if the container has recently completed
+   */
   public boolean isContainerRecentlyStopped(ContainerId containerId);
   
+  /**
+   * Add a container to the list of containers that have recently completed
+   * @param containerId the ID of the completed container
+   */
+  public void addCompletedContainer(ContainerId containerId);
+
+  /**
+   * Clear the list of recently completed containers
+   */
   public void clearFinishedContainersFromCache();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Aug 19 23:49:39 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -246,13 +247,12 @@ public class NodeStatusUpdaterImpl exten
   @VisibleForTesting
   protected void registerWithRM()
       throws YarnException, IOException {
-    List<ContainerStatus> containerStatuses = getContainerStatuses();
+    List<NMContainerStatus> containerReports = getNMContainerStatuses();
     RegisterNodeManagerRequest request =
         RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
-          nodeManagerVersionId, containerStatuses);
-    if (containerStatuses != null) {
-      LOG.info("Registering with RM using finished containers :"
-          + containerStatuses);
+          nodeManagerVersionId, containerReports, getRunningApplications());
+    if (containerReports != null) {
+      LOG.info("Registering with RM using containers :" + containerReports);
     }
     RegisterNodeManagerResponse regNMResponse =
         resourceTracker.registerNodeManager(request);
@@ -364,8 +364,7 @@ public class NodeStatusUpdaterImpl exten
         // Adding to finished containers cache. Cache will keep it around at
         // least for #durationToTrackStoppedContainers duration. In the
         // subsequent call to stop container it will get removed from cache.
-        updateStoppedContainersInCache(container.getContainerId());
-        addCompletedContainer(container);
+        addCompletedContainer(container.getContainerId());
       }
     }
     if (LOG.isDebugEnabled()) {
@@ -374,10 +373,42 @@ public class NodeStatusUpdaterImpl exten
     }
     return containerStatuses;
   }
+  
+  private List<ApplicationId> getRunningApplications() {
+    List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
+    runningApplications.addAll(this.context.getApplications().keySet());
+    return runningApplications;
+  }
+
+  // These NMContainerStatus are sent on NM registration and used by YARN only.
+  private List<NMContainerStatus> getNMContainerStatuses() {
+    List<NMContainerStatus> containerStatuses =
+        new ArrayList<NMContainerStatus>();
+    for (Container container : this.context.getContainers().values()) {
+      NMContainerStatus status =
+          container.getNMContainerStatus();
+      containerStatuses.add(status);
+      if (status.getContainerState().equals(ContainerState.COMPLETE)) {
+        // Adding to finished containers cache. Cache will keep it around at
+        // least for #durationToTrackStoppedContainers duration. In the
+        // subsequent call to stop container it will get removed from cache.
+        addCompletedContainer(container.getContainerId());
+      }
+    }
+    LOG.info("Sending out " + containerStatuses.size()
+      + " NM container statuses: " + containerStatuses);
+    return containerStatuses;
+  }
 
-  private void addCompletedContainer(Container container) {
+  @Override
+  public void addCompletedContainer(ContainerId containerId) {
     synchronized (previousCompletedContainers) {
-      previousCompletedContainers.add(container.getContainerId());
+      previousCompletedContainers.add(containerId);
+    }
+    synchronized (recentlyStoppedContainers) {
+      removeVeryOldStoppedContainersFromCache();
+      recentlyStoppedContainers.put(containerId,
+        System.currentTimeMillis() + durationToTrackStoppedContainers);
     }
   }
 
@@ -424,16 +455,6 @@ public class NodeStatusUpdaterImpl exten
     }
   }
   
-  @Private
-  @VisibleForTesting
-  public void updateStoppedContainersInCache(ContainerId containerId) {
-    synchronized (recentlyStoppedContainers) {
-      removeVeryOldStoppedContainersFromCache();
-      recentlyStoppedContainers.put(containerId,
-        System.currentTimeMillis() + durationToTrackStoppedContainers);
-    }
-  }
-  
   @Override
   public void clearFinishedContainersFromCache() {
     synchronized (recentlyStoppedContainers) {
@@ -449,8 +470,14 @@ public class NodeStatusUpdaterImpl exten
       Iterator<ContainerId> i =
           recentlyStoppedContainers.keySet().iterator();
       while (i.hasNext()) {
-        if (recentlyStoppedContainers.get(i.next()) < currentTime) {
+        ContainerId cid = i.next();
+        if (recentlyStoppedContainers.get(cid) < currentTime) {
           i.remove();
+          try {
+            context.getNMStateStore().removeContainer(cid);
+          } catch (IOException e) {
+            LOG.error("Unable to remove container " + cid + " in store", e);
+          }
         } else {
           break;
         }
@@ -493,6 +520,7 @@ public class NodeStatusUpdaterImpl exten
                     + " hence shutting down.");
               LOG.warn("Message from ResourceManager: "
                   + response.getDiagnosticsMessage());
+              context.setDecommissioned(true);
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
               break;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ResourceView.java Tue Aug 19 23:49:39 2014
@@ -27,4 +27,6 @@ public interface ResourceView {
   long getPmemAllocatedForContainers();
 
   boolean isPmemCheckEnabled();
+
+  long getVCoresAllocatedForContainers();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Aug 19 23:49:39 2014
@@ -20,8 +20,10 @@ package org.apache.hadoop.yarn.server.no
 
 import static org.apache.hadoop.service.Service.STATE.STARTED;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,6 +43,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
@@ -62,13 +65,17 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.SerializedException;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -79,6 +86,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
@@ -116,11 +125,16 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
 
 public class ContainerManagerImpl extends CompositeService implements
     ServiceStateChangeListener, ContainerManagementProtocol,
@@ -218,6 +232,108 @@ public class ContainerManagerImpl extend
         SHUTDOWN_CLEANUP_SLOP_MS;
 
     super.serviceInit(conf);
+    recover();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void recover() throws IOException, URISyntaxException {
+    NMStateStoreService stateStore = context.getNMStateStore();
+    if (stateStore.canRecover()) {
+      rsrcLocalizationSrvc.recoverLocalizedResources(
+          stateStore.loadLocalizationState());
+
+      RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
+      for (ContainerManagerApplicationProto proto :
+           appsState.getApplications()) {
+        recoverApplication(proto);
+      }
+
+      for (RecoveredContainerState rcs : stateStore.loadContainersState()) {
+        recoverContainer(rcs);
+      }
+
+      String diagnostic = "Application marked finished during recovery";
+      for (ApplicationId appId : appsState.getFinishedApplications()) {
+        dispatcher.getEventHandler().handle(
+            new ApplicationFinishEvent(appId, diagnostic));
+      }
+    }
+  }
+
+  private void recoverApplication(ContainerManagerApplicationProto p)
+      throws IOException {
+    ApplicationId appId = new ApplicationIdPBImpl(p.getId());
+    Credentials creds = new Credentials();
+    creds.readTokenStorageStream(
+        new DataInputStream(p.getCredentials().newInput()));
+
+    List<ApplicationACLMapProto> aclProtoList = p.getAclsList();
+    Map<ApplicationAccessType, String> acls =
+        new HashMap<ApplicationAccessType, String>(aclProtoList.size());
+    for (ApplicationACLMapProto aclProto : aclProtoList) {
+      acls.put(ProtoUtils.convertFromProtoFormat(aclProto.getAccessType()),
+          aclProto.getAcl());
+    }
+
+    LOG.info("Recovering application " + appId);
+    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
+        creds, context);
+    context.getApplications().put(appId, app);
+    app.handle(new ApplicationInitEvent(appId, acls));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void recoverContainer(RecoveredContainerState rcs)
+      throws IOException {
+    StartContainerRequest req = rcs.getStartRequest();
+    ContainerLaunchContext launchContext = req.getContainerLaunchContext();
+    ContainerTokenIdentifier token =
+        BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+    ContainerId containerId = token.getContainerID();
+    ApplicationId appId =
+        containerId.getApplicationAttemptId().getApplicationId();
+
+    LOG.info("Recovering " + containerId + " in state " + rcs.getStatus()
+        + " with exit code " + rcs.getExitCode());
+
+    if (context.getApplications().containsKey(appId)) {
+      Credentials credentials = parseCredentials(launchContext);
+      Container container = new ContainerImpl(getConfig(), dispatcher,
+          context.getNMStateStore(), req.getContainerLaunchContext(),
+          credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
+          rcs.getDiagnostics(), rcs.getKilled());
+      context.getContainers().put(containerId, container);
+      dispatcher.getEventHandler().handle(
+          new ApplicationContainerInitEvent(container));
+    } else {
+      if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
+        LOG.warn(containerId + " has no corresponding application!");
+      }
+      LOG.info("Adding " + containerId + " to recently stopped containers");
+      nodeStatusUpdater.addCompletedContainer(containerId);
+    }
+  }
+
+  private void waitForRecoveredContainers() throws InterruptedException {
+    final int sleepMsec = 100;
+    int waitIterations = 100;
+    List<ContainerId> newContainers = new ArrayList<ContainerId>();
+    while (--waitIterations >= 0) {
+      newContainers.clear();
+      for (Container container : context.getContainers().values()) {
+        if (container.getContainerState() == org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.NEW) {
+          newContainers.add(container.getContainerId());
+        }
+      }
+      if (newContainers.isEmpty()) {
+        break;
+      }
+      LOG.info("Waiting for containers: " + newContainers);
+      Thread.sleep(sleepMsec);
+    }
+    if (waitIterations < 0) {
+      LOG.warn("Timeout waiting for recovered containers");
+    }
   }
 
   protected LogHandler createLogHandler(Configuration conf, Context context,
@@ -239,7 +355,7 @@ public class ContainerManagerImpl extend
   protected ResourceLocalizationService createResourceLocalizationService(
       ContainerExecutor exec, DeletionService deletionContext) {
     return new ResourceLocalizationService(this.dispatcher, exec,
-        deletionContext, dirsHandler);
+        deletionContext, dirsHandler, context.getNMStateStore());
   }
 
   protected ContainersLauncher createContainersLauncher(Context context,
@@ -253,6 +369,23 @@ public class ContainerManagerImpl extend
     // Enqueue user dirs in deletion context
 
     Configuration conf = getConfig();
+    final InetSocketAddress initialAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_PORT);
+    boolean usingEphemeralPort = (initialAddress.getPort() == 0);
+    if (context.getNMStateStore().canRecover() && usingEphemeralPort) {
+      throw new IllegalArgumentException("Cannot support recovery with an "
+          + "ephemeral server port. Check the setting of "
+          + YarnConfiguration.NM_ADDRESS);
+    }
+    // If recovering then delay opening the RPC service until the recovery
+    // of resources and containers have completed, otherwise requests from
+    // clients during recovery can interfere with the recovery process.
+    final boolean delayedRpcServerStart =
+        context.getNMStateStore().canRecover();
+
     Configuration serverConf = new Configuration(conf);
 
     // always enforce it to be token-based.
@@ -262,11 +395,6 @@ public class ContainerManagerImpl extend
     
     YarnRPC rpc = YarnRPC.create(conf);
 
-    InetSocketAddress initialAddress = conf.getSocketAddr(
-        YarnConfiguration.NM_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_PORT);
-
     server =
         rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, 
             serverConf, this.context.getNMTokenSecretManager(),
@@ -283,16 +411,61 @@ public class ContainerManagerImpl extend
     LOG.info("Blocking new container-requests as container manager rpc" +
     		" server is still starting.");
     this.setBlockNewContainerRequests(true);
-    server.start();
-    InetSocketAddress connectAddress = NetUtils.getConnectAddress(server);
-    NodeId nodeId = NodeId.newInstance(
-        connectAddress.getAddress().getCanonicalHostName(),
-        connectAddress.getPort());
+
+    String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
+    String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
+    String hostOverride = null;
+    if (bindHost != null && !bindHost.isEmpty()
+        && nmAddress != null && !nmAddress.isEmpty()) {
+      //a bind-host case with an address, to support overriding the first
+      //hostname found when querying for our hostname with the specified
+      //address, combine the specified address with the actual port listened
+      //on by the server
+      hostOverride = nmAddress.split(":")[0];
+    }
+
+    // setup node ID
+    InetSocketAddress connectAddress;
+    if (delayedRpcServerStart) {
+      connectAddress = NetUtils.getConnectAddress(initialAddress);
+    } else {
+      server.start();
+      connectAddress = NetUtils.getConnectAddress(server);
+    }
+    NodeId nodeId = buildNodeId(connectAddress, hostOverride);
     ((NodeManager.NMContext)context).setNodeId(nodeId);
     this.context.getNMTokenSecretManager().setNodeId(nodeId);
     this.context.getContainerTokenSecretManager().setNodeId(nodeId);
-    LOG.info("ContainerManager started at " + connectAddress);
+
+    // start remaining services
     super.serviceStart();
+
+    if (delayedRpcServerStart) {
+      waitForRecoveredContainers();
+      server.start();
+
+      // check that the node ID is as previously advertised
+      connectAddress = NetUtils.getConnectAddress(server);
+      NodeId serverNode = buildNodeId(connectAddress, hostOverride);
+      if (!serverNode.equals(nodeId)) {
+        throw new IOException("Node mismatch after server started, expected '"
+            + nodeId + "' but found '" + serverNode + "'");
+      }
+    }
+
+    LOG.info("ContainerManager started at " + connectAddress);
+    LOG.info("ContainerManager bound to " + initialAddress);
+  }
+
+  private NodeId buildNodeId(InetSocketAddress connectAddress,
+      String hostOverride) {
+    if (hostOverride != null) {
+      connectAddress = NetUtils.getConnectAddress(
+          new InetSocketAddress(hostOverride, connectAddress.getPort()));
+    }
+    return NodeId.newInstance(
+        connectAddress.getAddress().getCanonicalHostName(),
+        connectAddress.getPort());
   }
 
   void refreshServiceAcls(Configuration configuration, 
@@ -329,6 +502,12 @@ public class ContainerManagerImpl extend
     }
     LOG.info("Applications still running : " + applications.keySet());
 
+    if (this.context.getNMStateStore().canRecover()
+        && !this.context.getDecommissioned()) {
+      // do not cleanup apps as they can be recovered on restart
+      return;
+    }
+
     List<ApplicationId> appIds =
         new ArrayList<ApplicationId>(applications.keySet());
     this.handle(
@@ -463,8 +642,8 @@ public class ContainerManagerImpl extend
     boolean unauthorized = false;
     StringBuilder messageBuilder =
         new StringBuilder("Unauthorized request to start container. ");
-    if (!nmTokenIdentifier.getApplicationAttemptId().equals(
-        containerId.getApplicationAttemptId())) {
+    if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().equals(
+        containerId.getApplicationAttemptId().getApplicationId())) {
       unauthorized = true;
       messageBuilder.append("\nNMToken for application attempt : ")
         .append(nmTokenIdentifier.getApplicationAttemptId())
@@ -485,6 +664,8 @@ public class ContainerManagerImpl extend
       messageBuilder.append("\nThis token is expired. current time is ")
         .append(System.currentTimeMillis()).append(" found ")
         .append(containerTokenIdentifier.getExpiryTimeStamp());
+      messageBuilder.append("\nNote: System times on machines may be out of sync.")
+        .append(" Check system time and time zones.");
     }
     if (unauthorized) {
       String msg = messageBuilder.toString();
@@ -536,6 +717,41 @@ public class ContainerManagerImpl extend
       succeededContainers, failedContainers);
   }
 
+  private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
+      String user, Credentials credentials,
+      Map<ApplicationAccessType, String> appAcls) {
+
+    ContainerManagerApplicationProto.Builder builder =
+        ContainerManagerApplicationProto.newBuilder();
+    builder.setId(((ApplicationIdPBImpl) appId).getProto());
+    builder.setUser(user);
+
+    builder.clearCredentials();
+    if (credentials != null) {
+      DataOutputBuffer dob = new DataOutputBuffer();
+      try {
+        credentials.writeTokenStorageToStream(dob);
+        builder.setCredentials(ByteString.copyFrom(dob.getData()));
+      } catch (IOException e) {
+        // should not occur
+        LOG.error("Cannot serialize credentials", e);
+      }
+    }
+
+    builder.clearAcls();
+    if (appAcls != null) {
+      for (Map.Entry<ApplicationAccessType, String> acl : appAcls.entrySet()) {
+        ApplicationACLMapProto p = ApplicationACLMapProto.newBuilder()
+            .setAccessType(ProtoUtils.convertToProtoFormat(acl.getKey()))
+            .setAcl(acl.getValue())
+            .build();
+        builder.addAcls(p);
+      }
+    }
+
+    return builder.build();
+  }
+
   @SuppressWarnings("unchecked")
   private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
       ContainerTokenIdentifier containerTokenIdentifier,
@@ -588,7 +804,8 @@ public class ContainerManagerImpl extend
     Credentials credentials = parseCredentials(launchContext);
 
     Container container =
-        new ContainerImpl(getConfig(), this.dispatcher, launchContext,
+        new ContainerImpl(getConfig(), this.dispatcher,
+            context.getNMStateStore(), launchContext,
           credentials, metrics, containerTokenIdentifier);
     ApplicationId applicationID =
         containerId.getApplicationAttemptId().getApplicationId();
@@ -609,12 +826,15 @@ public class ContainerManagerImpl extend
         if (null == context.getApplications().putIfAbsent(applicationID,
           application)) {
           LOG.info("Creating a new application reference for app " + applicationID);
-
+          Map<ApplicationAccessType, String> appAcls =
+              container.getLaunchContext().getApplicationACLs();
+          context.getNMStateStore().storeApplication(applicationID,
+              buildAppProto(applicationID, user, credentials, appAcls));
           dispatcher.getEventHandler().handle(
-            new ApplicationInitEvent(applicationID, container.getLaunchContext()
-              .getApplicationACLs()));
+            new ApplicationInitEvent(applicationID, appAcls));
         }
 
+        this.context.getNMStateStore().storeContainer(containerId, request);
         dispatcher.getEventHandler().handle(
           new ApplicationContainerInitEvent(container));
 
@@ -662,7 +882,7 @@ public class ContainerManagerImpl extend
   }
 
   private Credentials parseCredentials(ContainerLaunchContext launchContext)
-      throws YarnException {
+      throws IOException {
     Credentials credentials = new Credentials();
     // //////////// Parse credentials
     ByteBuffer tokens = launchContext.getTokens();
@@ -671,15 +891,11 @@ public class ContainerManagerImpl extend
       DataInputByteBuffer buf = new DataInputByteBuffer();
       tokens.rewind();
       buf.reset(tokens);
-      try {
-        credentials.readTokenStorageStream(buf);
-        if (LOG.isDebugEnabled()) {
-          for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
-            LOG.debug(tk.getService() + " = " + tk.toString());
-          }
+      credentials.readTokenStorageStream(buf);
+      if (LOG.isDebugEnabled()) {
+        for (Token<? extends TokenIdentifier> tk : credentials.getAllTokens()) {
+          LOG.debug(tk.getService() + " = " + tk.toString());
         }
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
       }
     }
     // //////////// End of parsing credentials
@@ -712,7 +928,7 @@ public class ContainerManagerImpl extend
 
   @SuppressWarnings("unchecked")
   private void stopContainerInternal(NMTokenIdentifier nmTokenIdentifier,
-      ContainerId containerID) throws YarnException {
+      ContainerId containerID) throws YarnException, IOException {
     String containerIDStr = containerID.toString();
     Container container = this.context.getContainers().get(containerID);
     LOG.info("Stopping container with container Id: " + containerIDStr);
@@ -725,9 +941,11 @@ public class ContainerManagerImpl extend
           + " is not handled by this NodeManager");
       }
     } else {
+      context.getNMStateStore().storeContainerKilled(containerID);
       dispatcher.getEventHandler().handle(
         new ContainerKillEvent(containerID,
-          "Container killed by the ApplicationMaster."));
+            ContainerExitStatus.KILLED_BY_APPMASTER,
+            "Container killed by the ApplicationMaster."));
 
       NMAuditLogger.logSuccess(container.getUser(),    
         AuditConstants.STOP_CONTAINER, "ContainerManageImpl", containerID
@@ -797,26 +1015,24 @@ public class ContainerManagerImpl extend
      * belongs to the same application attempt (NMToken) which was used. (Note:-
      * This will prevent user in knowing another application's containers).
      */
-
-    if ((!identifier.getApplicationAttemptId().equals(
-      containerId.getApplicationAttemptId()))
-        || (container != null && !identifier.getApplicationAttemptId().equals(
-          container.getContainerId().getApplicationAttemptId()))) {
+    ApplicationId nmTokenAppId =
+        identifier.getApplicationAttemptId().getApplicationId();
+    if ((!nmTokenAppId.equals(containerId.getApplicationAttemptId().getApplicationId()))
+        || (container != null && !nmTokenAppId.equals(container
+          .getContainerId().getApplicationAttemptId().getApplicationId()))) {
       if (stopRequest) {
         LOG.warn(identifier.getApplicationAttemptId()
             + " attempted to stop non-application container : "
-            + container.getContainerId().toString());
+            + container.getContainerId());
         NMAuditLogger.logFailure("UnknownUser", AuditConstants.STOP_CONTAINER,
           "ContainerManagerImpl", "Trying to stop unknown container!",
-          identifier.getApplicationAttemptId().getApplicationId(),
-          container.getContainerId());
+          nmTokenAppId, container.getContainerId());
       } else {
         LOG.warn(identifier.getApplicationAttemptId()
             + " attempted to get status for non-application container : "
-            + container.getContainerId().toString());
+            + container.getContainerId());
       }
     }
-
   }
 
   class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
@@ -864,6 +1080,11 @@ public class ContainerManagerImpl extend
         } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
           diagnostic = "Application killed by ResourceManager";
         }
+        try {
+          this.context.getNMStateStore().storeFinishedApplication(appID);
+        } catch (IOException e) {
+          LOG.error("Unable to update application state in store", e);
+        }
         this.dispatcher.getEventHandler().handle(
             new ApplicationFinishEvent(appID,
                 diagnostic));
@@ -876,6 +1097,7 @@ public class ContainerManagerImpl extend
           .getContainersToCleanup()) {
           this.dispatcher.getEventHandler().handle(
               new ContainerKillEvent(container,
+                  ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
                   "Container Killed by ResourceManager"));
       }
       break;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
+import java.io.IOException;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
@@ -375,6 +377,7 @@ public class ApplicationImpl implements 
       for (ContainerId containerID : app.containers.keySet()) {
         app.dispatcher.getEventHandler().handle(
             new ContainerKillEvent(containerID,
+                ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
                 "Container killed on application-finish event: " + appEvent.getDiagnostic()));
       }
       return ApplicationState.FINISHING_CONTAINERS_WAIT;
@@ -426,6 +429,11 @@ public class ApplicationImpl implements 
       ApplicationId appId = event.getApplicationID();
       app.context.getApplications().remove(appId);
       app.aclsManager.removeApplication(appId);
+      try {
+        app.context.getNMStateStore().removeApplication(appId);
+      } catch (IOException e) {
+        LOG.error("Unable to remove application from state store", e);
+      }
     }
   }
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 
 public interface Container extends EventHandler<ContainerEvent> {
 
@@ -39,7 +40,7 @@ public interface Container extends Event
   ContainerTokenIdentifier getContainerTokenIdentifier();
 
   String getUser();
-
+  
   ContainerState getContainerState();
 
   ContainerLaunchContext getLaunchContext();
@@ -50,6 +51,8 @@ public interface Container extends Event
 
   ContainerStatus cloneAndGetContainerStatus();
 
+  NMContainerStatus getNMContainerStatus();
+
   String toString();
 
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
+import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -47,7 +48,7 @@ import org.apache.hadoop.yarn.conf.YarnC
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -62,6 +63,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -75,6 +78,7 @@ public class ContainerImpl implements Co
   private final Lock readLock;
   private final Lock writeLock;
   private final Dispatcher dispatcher;
+  private final NMStateStoreService stateStore;
   private final Credentials credentials;
   private final NodeManagerMetrics metrics;
   private final ContainerLaunchContext launchContext;
@@ -101,12 +105,19 @@ public class ContainerImpl implements Co
   private final List<LocalResourceRequest> appRsrcs =
     new ArrayList<LocalResourceRequest>();
 
+  // whether container has been recovered after a restart
+  private RecoveredContainerStatus recoveredStatus =
+      RecoveredContainerStatus.REQUESTED;
+  // whether container was marked as killed after recovery
+  private boolean recoveredAsKilled = false;
+
   public ContainerImpl(Configuration conf, Dispatcher dispatcher,
-      ContainerLaunchContext launchContext, Credentials creds,
-      NodeManagerMetrics metrics,
+      NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+      Credentials creds, NodeManagerMetrics metrics,
       ContainerTokenIdentifier containerTokenIdentifier) {
     this.daemonConf = conf;
     this.dispatcher = dispatcher;
+    this.stateStore = stateStore;
     this.launchContext = launchContext;
     this.containerTokenIdentifier = containerTokenIdentifier;
     this.containerId = containerTokenIdentifier.getContainerID();
@@ -122,6 +133,21 @@ public class ContainerImpl implements Co
     stateMachine = stateMachineFactory.make(this);
   }
 
+  // constructor for a recovered container
+  public ContainerImpl(Configuration conf, Dispatcher dispatcher,
+      NMStateStoreService stateStore, ContainerLaunchContext launchContext,
+      Credentials creds, NodeManagerMetrics metrics,
+      ContainerTokenIdentifier containerTokenIdentifier,
+      RecoveredContainerStatus recoveredStatus, int exitCode,
+      String diagnostics, boolean wasKilled) {
+    this(conf, dispatcher, stateStore, launchContext, creds, metrics,
+        containerTokenIdentifier);
+    this.recoveredStatus = recoveredStatus;
+    this.exitCode = exitCode;
+    this.recoveredAsKilled = wasKilled;
+    this.diagnostics.append(diagnostics);
+  }
+
   private static final ContainerDoneTransition CONTAINER_DONE_TRANSITION =
     new ContainerDoneTransition();
 
@@ -135,8 +161,10 @@ public class ContainerImpl implements Co
       new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
     // From NEW State
     .addTransition(ContainerState.NEW,
-        EnumSet.of(ContainerState.LOCALIZING, ContainerState.LOCALIZED,
-            ContainerState.LOCALIZATION_FAILED),
+        EnumSet.of(ContainerState.LOCALIZING,
+            ContainerState.LOCALIZED,
+            ContainerState.LOCALIZATION_FAILED,
+            ContainerState.DONE),
         ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
     .addTransition(ContainerState.NEW, ContainerState.NEW,
         ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
@@ -281,7 +309,9 @@ public class ContainerImpl implements Co
         UPDATE_DIAGNOSTICS_TRANSITION)
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
         ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
-        ContainerEventType.KILL_CONTAINER)
+        EnumSet.of(ContainerEventType.KILL_CONTAINER,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
 
     // From DONE
     .addTransition(ContainerState.DONE, ContainerState.DONE,
@@ -295,7 +325,9 @@ public class ContainerImpl implements Co
     // we notify container of failed localization if localizer thread (for
     // that container) fails for some reason
     .addTransition(ContainerState.DONE, ContainerState.DONE,
-        ContainerEventType.RESOURCE_FAILED)
+        EnumSet.of(ContainerEventType.RESOURCE_FAILED,
+            ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
 
     // create the topology tables
     .installTopology();
@@ -388,6 +420,19 @@ public class ContainerImpl implements Co
   }
 
   @Override
+  public NMContainerStatus getNMContainerStatus() {
+    this.readLock.lock();
+    try {
+      return NMContainerStatus.newInstance(this.containerId, getCurrentState(),
+        getResource(), diagnostics.toString(), exitCode,
+        containerTokenIdentifier.getPriority(),
+        containerTokenIdentifier.getCreationTime());
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public ContainerId getContainerId() {
     return this.containerId;
   }
@@ -407,7 +452,7 @@ public class ContainerImpl implements Co
     }
   }
 
-  @SuppressWarnings({"fallthrough", "unchecked"})
+  @SuppressWarnings("fallthrough")
   private void finished() {
     ApplicationId applicationId =
         containerId.getApplicationAttemptId().getApplicationId();
@@ -445,7 +490,11 @@ public class ContainerImpl implements Co
     }
 
     metrics.releaseContainer(this.resource);
+    sendFinishedEvents();
+  }
 
+  @SuppressWarnings("unchecked")
+  private void sendFinishedEvents() {
     // Inform the application
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
@@ -458,6 +507,45 @@ public class ContainerImpl implements Co
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendLaunchEvent() {
+    ContainersLauncherEventType launcherEvent =
+        ContainersLauncherEventType.LAUNCH_CONTAINER;
+    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+      // try to recover a container that was previously launched
+      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+    }
+    dispatcher.getEventHandler().handle(
+        new ContainersLauncherEvent(this, launcherEvent));
+  }
+
+  // Inform the ContainersMonitor to start monitoring the container's
+  // resource usage.
+  @SuppressWarnings("unchecked") // dispatcher not typed
+  private void sendContainerMonitorStartEvent() {
+      long pmemBytes = getResource().getMemory() * 1024 * 1024L;
+      float pmemRatio = daemonConf.getFloat(
+          YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      long vmemBytes = (long) (pmemRatio * pmemBytes);
+
+      dispatcher.getEventHandler().handle(
+          new ContainerStartMonitoringEvent(containerId,
+              vmemBytes, pmemBytes));
+  }
+
+  private void addDiagnostics(String... diags) {
+    for (String s : diags) {
+      this.diagnostics.append(s);
+    }
+    try {
+      stateStore.storeContainerDiagnostics(containerId, diagnostics);
+    } catch (IOException e) {
+      LOG.warn("Unable to update diagnostics in state store for "
+          + containerId, e);
+    }
+  }
+
+  @SuppressWarnings("unchecked") // dispatcher not typed
   public void cleanup() {
     Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrc =
       new HashMap<LocalResourceVisibility, 
@@ -505,6 +593,16 @@ public class ContainerImpl implements Co
     @Override
     public ContainerState transition(ContainerImpl container,
         ContainerEvent event) {
+      if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
+        container.sendFinishedEvents();
+        return ContainerState.DONE;
+      } else if (container.recoveredAsKilled &&
+          container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
+        // container was killed but never launched
+        container.finished();
+        return ContainerState.DONE;
+      }
+
       final ContainerLaunchContext ctxt = container.launchContext;
       container.metrics.initingContainer();
 
@@ -580,9 +678,7 @@ public class ContainerImpl implements Co
               new ContainerLocalizationRequestEvent(container, req));
         return ContainerState.LOCALIZING;
       } else {
-        container.dispatcher.getEventHandler().handle(
-            new ContainersLauncherEvent(container,
-                ContainersLauncherEventType.LAUNCH_CONTAINER));
+        container.sendLaunchEvent();
         container.metrics.endInitingContainer();
         return ContainerState.LOCALIZED;
       }
@@ -593,7 +689,6 @@ public class ContainerImpl implements Co
    * Transition when one of the requested resources for this container
    * has been successfully localized.
    */
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LocalizedTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
     @Override
@@ -613,9 +708,8 @@ public class ContainerImpl implements Co
       if (!container.pendingResources.isEmpty()) {
         return ContainerState.LOCALIZING;
       }
-      container.dispatcher.getEventHandler().handle(
-          new ContainersLauncherEvent(container,
-              ContainersLauncherEventType.LAUNCH_CONTAINER));
+
+      container.sendLaunchEvent();
       container.metrics.endInitingContainer();
       return ContainerState.LOCALIZED;
     }
@@ -625,24 +719,22 @@ public class ContainerImpl implements Co
    * Transition from LOCALIZED state to RUNNING state upon receiving
    * a CONTAINER_LAUNCHED event
    */
-  @SuppressWarnings("unchecked") // dispatcher not typed
   static class LaunchTransition extends ContainerTransition {
+    @SuppressWarnings("unchecked")
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
-      // Inform the ContainersMonitor to start monitoring the container's
-      // resource usage.
-      long pmemBytes =
-          container.getResource().getMemory() * 1024 * 1024L;
-      float pmemRatio = container.daemonConf.getFloat(
-          YarnConfiguration.NM_VMEM_PMEM_RATIO,
-          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
-      long vmemBytes = (long) (pmemRatio * pmemBytes);
-      
-      container.dispatcher.getEventHandler().handle(
-          new ContainerStartMonitoringEvent(container.containerId,
-              vmemBytes, pmemBytes));
+      container.sendContainerMonitorStartEvent();
       container.metrics.runningContainer();
       container.wasLaunched  = true;
+
+      if (container.recoveredAsKilled) {
+        LOG.info("Killing " + container.containerId
+            + " due to recovered as killed");
+        container.addDiagnostics("Container recovered as killed.\n");
+        container.dispatcher.getEventHandler().handle(
+            new ContainersLauncherEvent(container,
+                ContainersLauncherEventType.CLEANUP_CONTAINER));
+      }
     }
   }
 
@@ -694,8 +786,7 @@ public class ContainerImpl implements Co
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
       container.exitCode = exitEvent.getExitCode();
       if (exitEvent.getDiagnosticInfo() != null) {
-        container.diagnostics.append(exitEvent.getDiagnosticInfo())
-          .append('\n');
+        container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
       }
 
       // TODO: Add containerWorkDir to the deletion service.
@@ -722,7 +813,7 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       super.transition(container, event);
-      container.diagnostics.append("Killed by external signal\n");
+      container.addDiagnostics("Killed by external signal\n");
     }
   }
 
@@ -737,9 +828,7 @@ public class ContainerImpl implements Co
 
       ContainerResourceFailedEvent rsrcFailedEvent =
           (ContainerResourceFailedEvent) event;
-      container.diagnostics.append(rsrcFailedEvent.getDiagnosticMessage()
-          + "\n");
-          
+      container.addDiagnostics(rsrcFailedEvent.getDiagnosticMessage(), "\n");
 
       // Inform the localizer to decrement reference counts and cleanup
       // resources.
@@ -761,9 +850,9 @@ public class ContainerImpl implements Co
       container.cleanup();
       container.metrics.endInitingContainer();
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
-      container.exitCode = ExitCode.TERMINATED.getExitCode();
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
-      container.diagnostics.append("Container is killed before being launched.\n");
+      container.exitCode = killEvent.getContainerExitStatus();
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.addDiagnostics("Container is killed before being launched.\n");
     }
   }
 
@@ -804,7 +893,8 @@ public class ContainerImpl implements Co
           new ContainersLauncherEvent(container,
               ContainersLauncherEventType.CLEANUP_CONTAINER));
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.exitCode = killEvent.getContainerExitStatus();
     }
   }
 
@@ -817,10 +907,12 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
-      container.exitCode = exitEvent.getExitCode();
+      if (container.hasDefaultExitCode()) {
+        container.exitCode = exitEvent.getExitCode();
+      }
+
       if (exitEvent.getDiagnosticInfo() != null) {
-        container.diagnostics.append(exitEvent.getDiagnosticInfo())
-          .append('\n');
+        container.addDiagnostics(exitEvent.getDiagnosticInfo(), "\n");
       }
 
       // The process/process-grp is killed. Decrement reference counts and
@@ -859,9 +951,9 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerKillEvent killEvent = (ContainerKillEvent) event;
-      container.exitCode = ExitCode.TERMINATED.getExitCode();
-      container.diagnostics.append(killEvent.getDiagnostic()).append("\n");
-      container.diagnostics.append("Container is killed before being launched.\n");
+      container.exitCode = killEvent.getContainerExitStatus();
+      container.addDiagnostics(killEvent.getDiagnostic(), "\n");
+      container.addDiagnostics("Container is killed before being launched.\n");
       super.transition(container, event);
     }
   }
@@ -875,8 +967,14 @@ public class ContainerImpl implements Co
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerDiagnosticsUpdateEvent updateEvent =
           (ContainerDiagnosticsUpdateEvent) event;
-      container.diagnostics.append(updateEvent.getDiagnosticsUpdate())
-          .append("\n");
+      container.addDiagnostics(updateEvent.getDiagnosticsUpdate(), "\n");
+      try {
+        container.stateStore.storeContainerDiagnostics(container.containerId,
+            container.diagnostics);
+      } catch (IOException e) {
+        LOG.warn("Unable to update state store diagnostics for "
+            + container.containerId, e);
+      }
     }
   }
 
@@ -916,4 +1014,8 @@ public class ContainerImpl implements Co
       this.readLock.unlock();
     }
   }
+
+  private boolean hasDefaultExitCode() {
+    return (this.exitCode == ContainerExitStatus.INVALID);
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerKillEvent.java Tue Aug 19 23:49:39 2014
@@ -23,13 +23,21 @@ import org.apache.hadoop.yarn.api.record
 public class ContainerKillEvent extends ContainerEvent {
 
   private final String diagnostic;
+  private final int exitStatus;
 
-  public ContainerKillEvent(ContainerId cID, String diagnostic) {
+  public ContainerKillEvent(ContainerId cID,
+      int exitStatus, String diagnostic) {
     super(cID, ContainerEventType.KILL_CONTAINER);
+    this.exitStatus = exitStatus;
     this.diagnostic = diagnostic;
   }
 
   public String getDiagnostic() {
     return this.diagnostic;
   }
+
+  public int getContainerExitStatus() {
+    return this.exitStatus;
+  }
+
 }