You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC

svn commit: r1619012 [7/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue Aug 19 23:49:39 2014
@@ -87,22 +87,23 @@ public class ContainerLaunch implements 
   public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
 
   private static final String PID_FILE_NAME_FMT = "%s.pid";
+  private static final String EXIT_CODE_FILE_SUFFIX = ".exitcode";
 
-  private final Dispatcher dispatcher;
-  private final ContainerExecutor exec;
+  protected final Dispatcher dispatcher;
+  protected final ContainerExecutor exec;
   private final Application app;
-  private final Container container;
+  protected final Container container;
   private final Configuration conf;
   private final Context context;
   private final ContainerManagerImpl containerManager;
   
-  private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
-  private volatile AtomicBoolean completed = new AtomicBoolean(false);
+  protected AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+  protected AtomicBoolean completed = new AtomicBoolean(false);
 
   private long sleepDelayBeforeSigKill = 250;
   private long maxKillWaitTime = 2000;
 
-  private Path pidFilePath = null;
+  protected Path pidFilePath = null;
 
   private final LocalDirsHandlerService dirsHandler;
 
@@ -223,14 +224,11 @@ public class ContainerLaunch implements 
               + Path.SEPARATOR + containerIdStr,
               LocalDirAllocator.SIZE_UNKNOWN, false);
 
-      String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
-          containerIdStr);
+      String pidFileSubpath = getPidFileSubpath(appIdStr, containerIdStr);
 
       // pid file should be in nm private dir so that it is not 
       // accessible by users
-      pidFilePath = dirsHandler.getLocalPathForWrite(
-          ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR 
-          + pidFileSuffix);
+      pidFilePath = dirsHandler.getLocalPathForWrite(pidFileSubpath);
       List<String> localDirs = dirsHandler.getLocalDirs();
       List<String> logDirs = dirsHandler.getLogDirs();
 
@@ -288,6 +286,7 @@ public class ContainerLaunch implements 
       dispatcher.getEventHandler().handle(new ContainerEvent(
             containerID,
             ContainerEventType.CONTAINER_LAUNCHED));
+      context.getNMStateStore().storeContainerLaunched(containerID);
 
       // Check if the container is signalled to be killed.
       if (!shouldLaunchContainer.compareAndSet(false, true)) {
@@ -310,6 +309,11 @@ public class ContainerLaunch implements 
     } finally {
       completed.set(true);
       exec.deactivateContainer(containerID);
+      try {
+        context.getNMStateStore().storeContainerCompleted(containerID, ret);
+      } catch (IOException e) {
+        LOG.error("Unable to set exit code for container " + containerID);
+      }
     }
 
     if (LOG.isDebugEnabled()) {
@@ -342,6 +346,11 @@ public class ContainerLaunch implements 
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
     return 0;
   }
+
+  protected String getPidFileSubpath(String appIdStr, String containerIdStr) {
+    return getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+        + String.format(ContainerLaunch.PID_FILE_NAME_FMT, containerIdStr);
+  }
   
   /**
    * Cleanup the container.
@@ -357,6 +366,13 @@ public class ContainerLaunch implements 
     String containerIdStr = ConverterUtils.toString(containerId);
     LOG.info("Cleaning up container " + containerIdStr);
 
+    try {
+      context.getNMStateStore().storeContainerKilled(containerId);
+    } catch (IOException e) {
+      LOG.error("Unable to mark container " + containerId
+          + " killed in store", e);
+    }
+
     // launch flag will be set to true if process already launched
     boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
     if (!alreadyLaunched) {
@@ -421,6 +437,7 @@ public class ContainerLaunch implements 
       if (pidFilePath != null) {
         FileContext lfs = FileContext.getLocalFSFileContext();
         lfs.delete(pidFilePath, false);
+        lfs.delete(pidFilePath.suffix(EXIT_CODE_FILE_SUFFIX), false);
       }
     }
   }
@@ -479,15 +496,24 @@ public class ContainerLaunch implements 
         + appIdStr;
   }
 
-  private static abstract class ShellScriptBuilder {
+  Context getContext() {
+    return context;
+  }
+
+  @VisibleForTesting
+  static abstract class ShellScriptBuilder {
+    public static ShellScriptBuilder create() {
+      return Shell.WINDOWS ? new WindowsShellScriptBuilder() :
+        new UnixShellScriptBuilder();
+    }
 
     private static final String LINE_SEPARATOR =
         System.getProperty("line.separator");
     private final StringBuilder sb = new StringBuilder();
 
-    public abstract void command(List<String> command);
+    public abstract void command(List<String> command) throws IOException;
 
-    public abstract void env(String key, String value);
+    public abstract void env(String key, String value) throws IOException;
 
     public final void symlink(Path src, Path dst) throws IOException {
       if (!src.isAbsolute()) {
@@ -520,11 +546,19 @@ public class ContainerLaunch implements 
 
     protected abstract void link(Path src, Path dst) throws IOException;
 
-    protected abstract void mkdir(Path path);
+    protected abstract void mkdir(Path path) throws IOException;
   }
 
   private static final class UnixShellScriptBuilder extends ShellScriptBuilder {
 
+    private void errorCheck() {
+      line("hadoop_shell_errorcode=$?");
+      line("if [ $hadoop_shell_errorcode -ne 0 ]");
+      line("then");
+      line("  exit $hadoop_shell_errorcode");
+      line("fi");
+    }
+
     public UnixShellScriptBuilder(){
       line("#!/bin/bash");
       line();
@@ -533,6 +567,7 @@ public class ContainerLaunch implements 
     @Override
     public void command(List<String> command) {
       line("exec /bin/bash -c \"", StringUtils.join(" ", command), "\"");
+      errorCheck();
     }
 
     @Override
@@ -543,31 +578,43 @@ public class ContainerLaunch implements 
     @Override
     protected void link(Path src, Path dst) throws IOException {
       line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
+      errorCheck();
     }
 
     @Override
     protected void mkdir(Path path) {
       line("mkdir -p ", path.toString());
+      errorCheck();
     }
   }
 
   private static final class WindowsShellScriptBuilder
       extends ShellScriptBuilder {
 
+    private void errorCheck() {
+      line("@if %errorlevel% neq 0 exit /b %errorlevel%");
+    }
+
+    private void lineWithLenCheck(String... commands) throws IOException {
+      Shell.checkWindowsCommandLineLength(commands);
+      line(commands);
+    }
+
     public WindowsShellScriptBuilder() {
       line("@setlocal");
       line();
     }
 
     @Override
-    public void command(List<String> command) {
-      line("@call ", StringUtils.join(" ", command));
+    public void command(List<String> command) throws IOException {
+      lineWithLenCheck("@call ", StringUtils.join(" ", command));
+      errorCheck();
     }
 
     @Override
-    public void env(String key, String value) {
-      line("@set ", key, "=", value,
-          "\nif %errorlevel% neq 0 exit /b %errorlevel%");
+    public void env(String key, String value) throws IOException {
+      lineWithLenCheck("@set ", key, "=", value);
+      errorCheck();
     }
 
     @Override
@@ -578,16 +625,20 @@ public class ContainerLaunch implements 
       // If not on Java7+ on Windows, then copy file instead of symlinking.
       // See also FileUtil#symLink for full explanation.
       if (!Shell.isJava7OrAbove() && srcFile.isFile()) {
-        line(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr));
+        lineWithLenCheck(String.format("@copy \"%s\" \"%s\"", srcFileStr, dstFileStr));
+        errorCheck();
       } else {
-        line(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS,
+        lineWithLenCheck(String.format("@%s symlink \"%s\" \"%s\"", Shell.WINUTILS,
           dstFileStr, srcFileStr));
+        errorCheck();
       }
     }
 
     @Override
-    protected void mkdir(Path path) {
-      line("@if not exist ", path.toString(), " mkdir ", path.toString());
+    protected void mkdir(Path path) throws IOException {
+      lineWithLenCheck(String.format("@if not exist \"%s\" mkdir \"%s\"",
+          path.toString(), path.toString()));
+      errorCheck();
     }
   }
 
@@ -730,8 +781,7 @@ public class ContainerLaunch implements 
       Map<String,String> environment, Map<Path,List<String>> resources,
       List<String> command)
       throws IOException {
-    ShellScriptBuilder sb = Shell.WINDOWS ? new WindowsShellScriptBuilder() :
-      new UnixShellScriptBuilder();
+    ShellScriptBuilder sb = ShellScriptBuilder.create();
     if (environment != null) {
       for (Map.Entry<String,String> env : environment.entrySet()) {
         sb.env(env.getKey().toString(), env.getValue().toString());
@@ -758,4 +808,7 @@ public class ContainerLaunch implements 
     }
   }
 
+  public static String getExitCodeFile(String pidFile) {
+    return pidFile + EXIT_CODE_FILE_SUFFIX;
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue Aug 19 23:49:39 2014
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,21 +31,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -107,7 +101,6 @@ public class ContainersLauncher extends 
     super.serviceStop();
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainersLauncherEvent event) {
     // TODO: ContainersLauncher launches containers one by one!!
@@ -125,6 +118,14 @@ public class ContainersLauncher extends 
         containerLauncher.submit(launch);
         running.put(containerId, launch);
         break;
+      case RECOVER_CONTAINER:
+        app = context.getApplications().get(
+            containerId.getApplicationAttemptId().getApplicationId());
+        launch = new RecoveredContainerLaunch(context, getConfig(), dispatcher,
+            exec, app, event.getContainer(), dirsHandler, containerManager);
+        containerLauncher.submit(launch);
+        running.put(containerId, launch);
+        break;
       case CLEANUP_CONTAINER:
         ContainerLaunch launcher = running.remove(containerId);
         if (launcher == null) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEventType.java Tue Aug 19 23:49:39 2014
@@ -20,5 +20,6 @@ package org.apache.hadoop.yarn.server.no
 
 public enum ContainersLauncherEventType {
   LAUNCH_CONTAINER,
+  RECOVER_CONTAINER,
   CLEANUP_CONTAINER, // The process(grp) itself.
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalCacheDirectoryManager.java Tue Aug 19 23:49:39 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * {@link LocalCacheDirectoryManager} is used for managing hierarchical
  * directories for local cache. It will allow to restrict the number of files in
@@ -99,6 +101,57 @@ public class LocalCacheDirectoryManager 
     }
   }
 
+  /**
+   * Increment the file count for a relative directory within the cache
+   * 
+   * @param relPath the relative path
+   */
+  public synchronized void incrementFileCountForPath(String relPath) {
+    relPath = relPath == null ? "" : relPath.trim();
+    Directory subDir = knownDirectories.get(relPath);
+    if (subDir == null) {
+      int dirnum = Directory.getDirectoryNumber(relPath);
+      totalSubDirectories = Math.max(dirnum, totalSubDirectories);
+      subDir = new Directory(dirnum);
+      nonFullDirectories.add(subDir);
+      knownDirectories.put(subDir.getRelativePath(), subDir);
+    }
+    if (subDir.incrementAndGetCount() >= perDirectoryFileLimit) {
+      nonFullDirectories.remove(subDir);
+    }
+  }
+
+  /**
+   * Given a path to a directory within a local cache tree return the
+   * root of the cache directory.
+   * 
+   * @param path the directory within a cache directory
+   * @return the local cache directory root or null if not found
+   */
+  public static Path getCacheDirectoryRoot(Path path) {
+    while (path != null) {
+      String name = path.getName();
+      if (name.length() != 1) {
+        return path;
+      }
+      int dirnum = DIRECTORIES_PER_LEVEL;
+      try {
+        dirnum = Integer.parseInt(name, DIRECTORIES_PER_LEVEL);
+      } catch (NumberFormatException e) {
+      }
+      if (dirnum >= DIRECTORIES_PER_LEVEL) {
+        return path;
+      }
+      path = path.getParent();
+    }
+    return path;
+  }
+
+  @VisibleForTesting
+  synchronized Directory getDirectory(String relPath) {
+    return knownDirectories.get(relPath);
+  }
+
   /*
    * It limits the number of files and sub directories in the directory to the
    * limit LocalCacheDirectoryManager#perDirectoryFileLimit.
@@ -108,11 +161,9 @@ public class LocalCacheDirectoryManager 
     private final String relativePath;
     private int fileCount;
 
-    public Directory(int directoryNo) {
-      fileCount = 0;
-      if (directoryNo == 0) {
-        relativePath = "";
-      } else {
+    static String getRelativePath(int directoryNo) {
+      String relativePath = "";
+      if (directoryNo > 0) {
         String tPath = Integer.toString(directoryNo - 1, DIRECTORIES_PER_LEVEL);
         StringBuffer sb = new StringBuffer();
         if (tPath.length() == 1) {
@@ -128,6 +179,27 @@ public class LocalCacheDirectoryManager 
         }
         relativePath = sb.toString();
       }
+      return relativePath;
+    }
+
+    static int getDirectoryNumber(String relativePath) {
+      String numStr = relativePath.replace("/", "");
+      if (relativePath.isEmpty()) {
+        return 0;
+      }
+      if (numStr.length() > 1) {
+        // undo step from getRelativePath() to reuse 0th sub directory
+        String firstChar = Integer.toString(
+            Integer.parseInt(numStr.substring(0, 1),
+                DIRECTORIES_PER_LEVEL) + 1, DIRECTORIES_PER_LEVEL);
+        numStr = firstChar + numStr.substring(1);
+      }
+      return Integer.parseInt(numStr, DIRECTORIES_PER_LEVEL) + 1;
+    }
+
+    public Directory(int directoryNo) {
+      fileCount = 0;
+      relativePath = getRelativePath(directoryNo);
     }
 
     public int incrementAndGetCount() {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Tue Aug 19 23:49:39 2014
@@ -18,15 +18,12 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * Component tracking resources all of the same {@link LocalResourceVisibility}
  * 
@@ -34,18 +31,11 @@ import com.google.common.annotations.Vis
 interface LocalResourcesTracker
     extends EventHandler<ResourceEvent>, Iterable<LocalizedResource> {
 
-  // TODO: Not used at all!!
-  boolean contains(LocalResourceRequest resource);
-
   boolean remove(LocalizedResource req, DeletionService delService);
 
   Path getPathForLocalization(LocalResourceRequest req, Path localDirPath);
 
   String getUser();
 
-  long nextUniqueNumber();
-  
-  @VisibleForTesting
-  @Private
   LocalizedResource getLocalizedResource(LocalResourceRequest request);
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -27,14 +28,21 @@ import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -53,6 +61,7 @@ class LocalResourcesTrackerImpl implemen
       .compile(RANDOM_DIR_REGEX);
 
   private final String user;
+  private final ApplicationId appId;
   private final Dispatcher dispatcher;
   private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
   private Configuration conf;
@@ -77,17 +86,22 @@ class LocalResourcesTrackerImpl implemen
    * per APPLICATION, USER and PUBLIC cache.
    */
   private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
+  private NMStateStoreService stateStore;
 
-  public LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
-      boolean useLocalCacheDirectoryManager, Configuration conf) {
-    this(user, dispatcher,
+  public LocalResourcesTrackerImpl(String user, ApplicationId appId,
+      Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
+      Configuration conf, NMStateStoreService stateStore) {
+    this(user, appId, dispatcher,
       new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
-      useLocalCacheDirectoryManager, conf);
+      useLocalCacheDirectoryManager, conf, stateStore);
   }
 
-  LocalResourcesTrackerImpl(String user, Dispatcher dispatcher,
+  LocalResourcesTrackerImpl(String user, ApplicationId appId,
+      Dispatcher dispatcher,
       ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
-      boolean useLocalCacheDirectoryManager, Configuration conf) {
+      boolean useLocalCacheDirectoryManager, Configuration conf,
+      NMStateStoreService stateStore) {
+    this.appId = appId;
     this.user = user;
     this.dispatcher = dispatcher;
     this.localrsrc = localrsrc;
@@ -98,6 +112,7 @@ class LocalResourcesTrackerImpl implemen
         new ConcurrentHashMap<LocalResourceRequest, Path>();
     }
     this.conf = conf;
+    this.stateStore = stateStore;
   }
 
   /*
@@ -119,8 +134,7 @@ class LocalResourcesTrackerImpl implemen
       if (rsrc != null && (!isResourcePresent(rsrc))) {
         LOG.info("Resource " + rsrc.getLocalPath()
             + " is missing, localizing it again");
-        localrsrc.remove(req);
-        decrementFileCountForLocalCacheDirectory(req, rsrc);
+        removeResource(req);
         rsrc = null;
       }
       if (null == rsrc) {
@@ -141,15 +155,102 @@ class LocalResourcesTrackerImpl implemen
       }
       break;
     case LOCALIZATION_FAILED:
-      decrementFileCountForLocalCacheDirectory(req, null);
       /*
        * If resource localization fails then Localized resource will be
        * removed from local cache.
        */
-      localrsrc.remove(req);
+      removeResource(req);
+      break;
+    case RECOVERED:
+      if (rsrc != null) {
+        LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
+        return;
+      }
+      rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
+      localrsrc.put(req, rsrc);
       break;
     }
+
     rsrc.handle(event);
+
+    if (event.getType() == ResourceEventType.LOCALIZED) {
+      if (rsrc.getLocalPath() != null) {
+        try {
+          stateStore.finishResourceLocalization(user, appId,
+              buildLocalizedResourceProto(rsrc));
+        } catch (IOException ioe) {
+          LOG.error("Error storing resource state for " + rsrc, ioe);
+        }
+      } else {
+        LOG.warn("Resource " + rsrc + " localized without a location");
+      }
+    }
+  }
+
+  private LocalizedResource recoverResource(LocalResourceRequest req,
+      ResourceRecoveredEvent event) {
+    // unique number for a resource is the directory of the resource
+    Path localDir = event.getLocalPath().getParent();
+    long rsrcId = Long.parseLong(localDir.getName());
+
+    // update ID generator to avoid conflicts with existing resources
+    while (true) {
+      long currentRsrcId = uniqueNumberGenerator.get();
+      long nextRsrcId = Math.max(currentRsrcId, rsrcId);
+      if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
+        break;
+      }
+    }
+
+    incrementFileCountForLocalCacheDirectory(localDir.getParent());
+
+    return new LocalizedResource(req, dispatcher);
+  }
+
+  private LocalizedResourceProto buildLocalizedResourceProto(
+      LocalizedResource rsrc) {
+    return LocalizedResourceProto.newBuilder()
+        .setResource(buildLocalResourceProto(rsrc.getRequest()))
+        .setLocalPath(rsrc.getLocalPath().toString())
+        .setSize(rsrc.getSize())
+        .build();
+  }
+
+  private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
+    LocalResourcePBImpl lrpb;
+    if (!(lr instanceof LocalResourcePBImpl)) {
+      lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
+          lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
+          lr.getPattern());
+    }
+    lrpb = (LocalResourcePBImpl) lr;
+    return lrpb.getProto();
+  }
+
+  public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
+    if (useLocalCacheDirectoryManager) {
+      Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
+          cacheDir);
+      if (cacheRoot != null) {
+        LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
+        if (dir == null) {
+          dir = new LocalCacheDirectoryManager(conf);
+          LocalCacheDirectoryManager otherDir =
+              directoryManagers.putIfAbsent(cacheRoot, dir);
+          if (otherDir != null) {
+            dir = otherDir;
+          }
+        }
+        if (cacheDir.equals(cacheRoot)) {
+          dir.incrementFileCountForPath("");
+        } else {
+          String dirStr = cacheDir.toUri().getRawPath();
+          String rootStr = cacheRoot.toUri().getRawPath();
+          dir.incrementFileCountForPath(
+              dirStr.substring(rootStr.length() + 1));
+        }
+      }
+    }
   }
 
   /*
@@ -217,11 +318,6 @@ class LocalResourcesTrackerImpl implemen
   }
   
   @Override
-  public boolean contains(LocalResourceRequest resource) {
-    return localrsrc.containsKey(resource);
-  }
-
-  @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
  // current synchronization guaranteed by crude RLS event for cleanup
     LocalizedResource rsrc = localrsrc.get(rem.getRequest());
@@ -237,16 +333,31 @@ class LocalResourcesTrackerImpl implemen
           + " with non-zero refcount");
       return false;
     } else { // ResourceState is LOCALIZED or INIT
-      localrsrc.remove(rem.getRequest());
       if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
         delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
       }
-      decrementFileCountForLocalCacheDirectory(rem.getRequest(), rsrc);
+      removeResource(rem.getRequest());
       LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
       return true;
     }
   }
 
+  private void removeResource(LocalResourceRequest req) {
+    LocalizedResource rsrc = localrsrc.remove(req);
+    decrementFileCountForLocalCacheDirectory(req, rsrc);
+    if (rsrc != null) {
+      Path localPath = rsrc.getLocalPath();
+      if (localPath != null) {
+        try {
+          stateStore.removeLocalizedResource(user, appId, localPath);
+        } catch (IOException e) {
+          LOG.error("Unable to remove resource " + rsrc + " from state store",
+              e);
+        }
+      }
+    }
+  }
+
   /**
    * Returns the path up to the random directory component.
    */
@@ -285,6 +396,7 @@ class LocalResourcesTrackerImpl implemen
   @Override
   public Path
       getPathForLocalization(LocalResourceRequest req, Path localDirPath) {
+    Path rPath = localDirPath;
     if (useLocalCacheDirectoryManager && localDirPath != null) {
 
       if (!directoryManagers.containsKey(localDirPath)) {
@@ -293,7 +405,7 @@ class LocalResourcesTrackerImpl implemen
       }
       LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
 
-      Path rPath = localDirPath;
+      rPath = localDirPath;
       String hierarchicalPath = dir.getRelativePathForLocalization();
       // For most of the scenarios we will get root path only which
       // is an empty string
@@ -301,21 +413,36 @@ class LocalResourcesTrackerImpl implemen
         rPath = new Path(localDirPath, hierarchicalPath);
       }
       inProgressLocalResourcesMap.put(req, rPath);
-      return rPath;
-    } else {
-      return localDirPath;
     }
-  }
 
-  @Override
-  public long nextUniqueNumber() {
-    return uniqueNumberGenerator.incrementAndGet();
+    rPath = new Path(rPath,
+        Long.toString(uniqueNumberGenerator.incrementAndGet()));
+    Path localPath = new Path(rPath, req.getPath().getName());
+    LocalizedResource rsrc = localrsrc.get(req);
+    rsrc.setLocalPath(localPath);
+    LocalResource lr = LocalResource.newInstance(req.getResource(),
+        req.getType(), req.getVisibility(), req.getSize(),
+        req.getTimestamp());
+    try {
+      stateStore.startResourceLocalization(user, appId,
+          ((LocalResourcePBImpl) lr).getProto(), localPath);
+    } catch (IOException e) {
+      LOG.error("Unable to record localization start for " + rsrc, e);
+    }
+    return rPath;
   }
 
-  @VisibleForTesting
-  @Private
   @Override
   public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
     return localrsrc.get(request);
   }
-}
\ No newline at end of file
+
+  @VisibleForTesting
+  LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
+    LocalCacheDirectoryManager mgr = null;
+    if (useLocalCacheDirectoryManager) {
+      mgr = directoryManagers.get(localDirPath);
+    }
+    return mgr;
+  }
+}

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue Aug 19 23:49:39 2014
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -54,8 +55,8 @@ public class LocalizedResource implement
 
   private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
 
-  Path localPath;
-  long size = -1;
+  volatile Path localPath;
+  volatile long size = -1;
   final LocalResourceRequest rsrc;
   final Dispatcher dispatcher;
   final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
@@ -76,6 +77,8 @@ public class LocalizedResource implement
     // From INIT (ref == 0, awaiting req)
     .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
         ResourceEventType.REQUEST, new FetchResourceTransition())
+    .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
+        ResourceEventType.RECOVERED, new RecoveredTransition())
 
     // From DOWNLOADING (ref > 0, may be localizing)
     .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
@@ -157,6 +160,10 @@ public class LocalizedResource implement
     return localPath;
   }
 
+  public void setLocalPath(Path localPath) {
+    this.localPath = Path.getPathWithoutSchemeAndAuthority(localPath);
+  }
+
   public long getTimestamp() {
     return timestamp.get();
   }
@@ -234,7 +241,8 @@ public class LocalizedResource implement
     @Override
     public void transition(LocalizedResource rsrc, ResourceEvent event) {
       ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
-      rsrc.localPath = locEvent.getLocation();
+      rsrc.localPath =
+          Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
       rsrc.size = locEvent.getSize();
       for (ContainerId container : rsrc.ref) {
         rsrc.dispatcher.getEventHandler().handle(
@@ -291,4 +299,13 @@ public class LocalizedResource implement
       rsrc.release(relEvent.getContainer());
     }
   }
+
+  private static class RecoveredTransition extends ResourceTransition {
+    @Override
+    public void transition(LocalizedResource rsrc, ResourceEvent event) {
+      ResourceRecoveredEvent recoveredEvent = (ResourceRecoveredEvent) event;
+      rsrc.localPath = recoveredEvent.getLocalPath();
+      rsrc.size = recoveredEvent.getSize();
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Tue Aug 19 23:49:39 2014
@@ -74,13 +74,17 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
@@ -109,10 +113,15 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceFailedLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.LocalResourceTrackerState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerBuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -142,6 +151,7 @@ public class ResourceLocalizationService
   private RecordFactory recordFactory;
   private final ScheduledExecutorService cacheCleanup;
   private LocalizerTokenSecretManager secretManager;
+  private NMStateStoreService stateStore;
 
   private LocalResourcesTracker publicRsrc;
 
@@ -163,7 +173,7 @@ public class ResourceLocalizationService
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService,
-      LocalDirsHandlerService dirsHandler) {
+      LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
 
     super(ResourceLocalizationService.class.getName());
     this.exec = exec;
@@ -175,6 +185,7 @@ public class ResourceLocalizationService
         new ThreadFactoryBuilder()
           .setNameFormat("ResourceLocalizationService Cache Cleanup")
           .build());
+    this.stateStore = stateStore;
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -203,15 +214,17 @@ public class ResourceLocalizationService
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     this.validateConf(conf);
-    this.publicRsrc =
-        new LocalResourcesTrackerImpl(null, dispatcher, true, conf);
+    this.publicRsrc = new LocalResourcesTrackerImpl(null, null, dispatcher,
+        true, conf, stateStore);
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
       FileContext lfs = getLocalFileContext(conf);
       lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
 
-      cleanUpLocalDir(lfs,delService);
+      if (!stateStore.canRecover()) {
+        cleanUpLocalDir(lfs,delService);
+      }
 
       List<String> localDirs = dirsHandler.getLocalDirs();
       for (String localDir : localDirs) {
@@ -239,6 +252,7 @@ public class ResourceLocalizationService
     cacheCleanupPeriod =
       conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
     localizationServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
         YarnConfiguration.NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
         YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
@@ -249,6 +263,74 @@ public class ResourceLocalizationService
     super.serviceInit(conf);
   }
 
+  //Recover localized resources after an NM restart
+  public void recoverLocalizedResources(RecoveredLocalizationState state)
+      throws URISyntaxException {
+    LocalResourceTrackerState trackerState = state.getPublicTrackerState();
+    recoverTrackerResources(publicRsrc, trackerState);
+
+    for (Map.Entry<String, RecoveredUserResources> userEntry :
+         state.getUserResources().entrySet()) {
+      String user = userEntry.getKey();
+      RecoveredUserResources userResources = userEntry.getValue();
+      trackerState = userResources.getPrivateTrackerState();
+      if (!trackerState.isEmpty()) {
+        LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+            null, dispatcher, true, super.getConfig(), stateStore);
+        LocalResourcesTracker oldTracker = privateRsrc.putIfAbsent(user,
+            tracker);
+        if (oldTracker != null) {
+          tracker = oldTracker;
+        }
+        recoverTrackerResources(tracker, trackerState);
+      }
+
+      for (Map.Entry<ApplicationId, LocalResourceTrackerState> appEntry :
+           userResources.getAppTrackerStates().entrySet()) {
+        trackerState = appEntry.getValue();
+        if (!trackerState.isEmpty()) {
+          ApplicationId appId = appEntry.getKey();
+          String appIdStr = ConverterUtils.toString(appId);
+          LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+              appId, dispatcher, false, super.getConfig(), stateStore);
+          LocalResourcesTracker oldTracker = appRsrc.putIfAbsent(appIdStr,
+              tracker);
+          if (oldTracker != null) {
+            tracker = oldTracker;
+          }
+          recoverTrackerResources(tracker, trackerState);
+        }
+      }
+    }
+  }
+
+  private void recoverTrackerResources(LocalResourcesTracker tracker,
+      LocalResourceTrackerState state) throws URISyntaxException {
+    for (LocalizedResourceProto proto : state.getLocalizedResources()) {
+      LocalResource rsrc = new LocalResourcePBImpl(proto.getResource());
+      LocalResourceRequest req = new LocalResourceRequest(rsrc);
+      LOG.info("Recovering localized resource " + req + " at "
+          + proto.getLocalPath());
+      tracker.handle(new ResourceRecoveredEvent(req,
+          new Path(proto.getLocalPath()), proto.getSize()));
+    }
+
+    for (Map.Entry<LocalResourceProto, Path> entry :
+         state.getInProgressResources().entrySet()) {
+      LocalResource rsrc = new LocalResourcePBImpl(entry.getKey());
+      LocalResourceRequest req = new LocalResourceRequest(rsrc);
+      Path localPath = entry.getValue();
+      tracker.handle(new ResourceRecoveredEvent(req, localPath, 0));
+
+      // delete any in-progress localizations, containers will request again
+      LOG.info("Deleting in-progress localization for " + req + " at "
+          + localPath);
+      tracker.remove(tracker.getLocalizedResource(req), delService);
+    }
+
+    // TODO: remove untracked directories in local filesystem
+  }
+
   @Override
   public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) {
     return localizerTracker.processHeartbeat(status);
@@ -261,7 +343,9 @@ public class ResourceLocalizationService
     server = createServer();
     server.start();
     localizationServerAddress =
-        getConfig().updateConnectAddr(YarnConfiguration.NM_LOCALIZER_ADDRESS,
+        getConfig().updateConnectAddr(YarnConfiguration.NM_BIND_HOST,
+                                      YarnConfiguration.NM_LOCALIZER_ADDRESS,
+                                      YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
                                       server.getListenerAddress());
     LOG.info("Localizer started on port " + server.getPort());
     super.serviceStart();
@@ -337,17 +421,10 @@ public class ResourceLocalizationService
     // 0) Create application tracking structs
     String userName = app.getUser();
     privateRsrc.putIfAbsent(userName, new LocalResourcesTrackerImpl(userName,
-      dispatcher, true, super.getConfig()));
-    if (null != appRsrc.putIfAbsent(
-      ConverterUtils.toString(app.getAppId()),
-      new LocalResourcesTrackerImpl(app.getUser(), dispatcher, false, super
-        .getConfig()))) {
-      LOG.warn("Initializing application " + app + " already present");
-      assert false; // TODO: FIXME assert doesn't help
-                    // ^ The condition is benign. Tests should fail and it
-                    // should appear in logs, but it's an internal error
-                    // that should have no effect on applications
-    }
+        null, dispatcher, true, super.getConfig(), stateStore));
+    String appIdStr = ConverterUtils.toString(app.getAppId());
+    appRsrc.putIfAbsent(appIdStr, new LocalResourcesTrackerImpl(app.getUser(),
+        app.getAppId(), dispatcher, false, super.getConfig(), stateStore));
     // 1) Signal container init
     //
     // This is handled by the ApplicationImpl state machine and allows
@@ -446,18 +523,28 @@ public class ResourceLocalizationService
 
   @SuppressWarnings({"unchecked"})
   private void handleDestroyApplicationResources(Application application) {
-    String userName;
-    String appIDStr;
+    String userName = application.getUser();
+    ApplicationId appId = application.getAppId();
+    String appIDStr = application.toString();
     LocalResourcesTracker appLocalRsrcsTracker =
-      appRsrc.remove(ConverterUtils.toString(application.getAppId()));
-    if (null == appLocalRsrcsTracker) {
+      appRsrc.remove(ConverterUtils.toString(appId));
+    if (appLocalRsrcsTracker != null) {
+      for (LocalizedResource rsrc : appLocalRsrcsTracker ) {
+        Path localPath = rsrc.getLocalPath();
+        if (localPath != null) {
+          try {
+            stateStore.removeLocalizedResource(userName, appId, localPath);
+          } catch (IOException e) {
+            LOG.error("Unable to remove resource " + rsrc + " for " + appIDStr
+                + " from state store", e);
+          }
+        }
+      }
+    } else {
       LOG.warn("Removing uninitialized application " + application);
     }
-    // TODO: What to do with appLocalRsrcsTracker?
 
     // Delete the application directories
-    userName = application.getUser();
-    appIDStr = application.toString();
     for (String localDir : dirsHandler.getLocalDirs()) {
 
       // Delete the user-owned app-dir
@@ -668,19 +755,15 @@ public class ResourceLocalizationService
         if (rsrc.getState().equals(ResourceState.DOWNLOADING)) {
           LocalResource resource = request.getResource().getRequest();
           try {
-            Path publicDirDestPath =
+            Path publicRootPath =
                 dirsHandler.getLocalPathForWrite("." + Path.SEPARATOR
                     + ContainerLocalizer.FILECACHE,
                   ContainerLocalizer.getEstimatedSize(resource), true);
-            Path hierarchicalPath =
-                publicRsrc.getPathForLocalization(key, publicDirDestPath);
-            if (!hierarchicalPath.equals(publicDirDestPath)) {
-              publicDirDestPath = hierarchicalPath;
+            Path publicDirDestPath =
+                publicRsrc.getPathForLocalization(key, publicRootPath);
+            if (!publicDirDestPath.getParent().equals(publicRootPath)) {
               DiskChecker.checkDir(new File(publicDirDestPath.toUri().getPath()));
             }
-            publicDirDestPath =
-                new Path(publicDirDestPath, Long.toString(publicRsrc
-                  .nextUniqueNumber()));
             // explicitly synchronize pending here to avoid future task
             // completing and being dequeued before pending updated
             synchronized (pending) {
@@ -968,9 +1051,8 @@ public class ResourceLocalizationService
       Path dirPath =
           dirsHandler.getLocalPathForWrite(cacheDirectory,
             ContainerLocalizer.getEstimatedSize(rsrc), false);
-      dirPath = tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
-        dirPath);
-      return new Path (dirPath, Long.toString(tracker.nextUniqueNumber()));
+      return tracker.getPathForLocalization(new LocalResourceRequest(rsrc),
+          dirPath);
     }
 
     @Override

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Tue Aug 19 23:49:39 2014
@@ -31,5 +31,7 @@ public enum ResourceEventType {
   /** See {@link ResourceReleaseEvent} */
   RELEASE,
   /** See {@link ResourceFailedLocalizationEvent} */
-  LOCALIZATION_FAILED
+  LOCALIZATION_FAILED,
+  /** See {@link ResourceRecoveredEvent} */
+  RECOVERED
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Tue Aug 19 23:49:39 2014
@@ -25,5 +25,7 @@ public interface AppLogAggregator extend
   void startContainerLogAggregation(ContainerId containerId,
       boolean wasContainerSuccessful);
 
+  void abortLogAggregation();
+
   void finishLogAggregation();
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,7 @@ public class AppLogAggregatorImpl implem
   private final BlockingQueue<ContainerId> pendingContainers;
   private final AtomicBoolean appFinishing = new AtomicBoolean();
   private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+  private final AtomicBoolean aborted = new AtomicBoolean();
   private final Map<ApplicationAccessType, String> appAcls;
 
   private LogWriter writer = null;
@@ -150,7 +151,7 @@ public class AppLogAggregatorImpl implem
   private void doAppLogAggregation() {
     ContainerId containerId;
 
-    while (!this.appFinishing.get()) {
+    while (!this.appFinishing.get() && !this.aborted.get()) {
       synchronized(this) {
         try {
           wait(THREAD_SLEEP_TIME);
@@ -161,6 +162,10 @@ public class AppLogAggregatorImpl implem
       }
     }
 
+    if (this.aborted.get()) {
+      return;
+    }
+
     // Application is finished. Finish pending-containers
     while ((containerId = this.pendingContainers.poll()) != null) {
       uploadLogsForContainer(containerId);
@@ -255,4 +260,11 @@ public class AppLogAggregatorImpl implem
     this.appFinishing.set(true);
     this.notifyAll();
   }
+
+  @Override
+  public synchronized void abortLogAggregation() {
+    LOG.info("Aborting log aggregation for " + this.applicationId);
+    this.aborted.set(true);
+    this.notifyAll();
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Tue Aug 19 23:49:39 2014
@@ -142,9 +142,17 @@ public class LogAggregationService exten
    
   private void stopAggregators() {
     threadPool.shutdown();
+    // if recovery on restart is supported then leave outstanding aggregations
+    // to the next restart
+    boolean shouldAbort = context.getNMStateStore().canRecover()
+        && !context.getDecommissioned();
     // politely ask to finish
     for (AppLogAggregator aggregator : appLogAggregators.values()) {
-      aggregator.finishLogAggregation();
+      if (shouldAbort) {
+        aggregator.abortLogAggregation();
+      } else {
+        aggregator.finishLogAggregation();
+      }
     }
     while (!threadPool.isTerminated()) { // wait for all threads to finish
       for (ApplicationId appId : appLogAggregators.keySet()) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java Tue Aug 19 23:49:39 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -69,6 +70,8 @@ public class ContainersMonitorImpl exten
   private boolean pmemCheckEnabled;
   private boolean vmemCheckEnabled;
 
+  private long maxVCoresAllottedForContainers;
+
   private static final long UNKNOWN_MEMORY_LIMIT = -1L;
 
   public ContainersMonitorImpl(ContainerExecutor exec,
@@ -107,10 +110,16 @@ public class ContainersMonitorImpl exten
         YarnConfiguration.NM_PMEM_MB,
         YarnConfiguration.DEFAULT_NM_PMEM_MB) * 1024 * 1024l;
 
+    long configuredVCoresForContainers = conf.getLong(
+        YarnConfiguration.NM_VCORES,
+        YarnConfiguration.DEFAULT_NM_VCORES);
+
+
     // Setting these irrespective of whether checks are enabled. Required in
     // the UI.
     // ///////// Physical memory configuration //////
     this.maxPmemAllottedForContainers = configuredPMemForContainers;
+    this.maxVCoresAllottedForContainers = configuredVCoresForContainers;
 
     // ///////// Virtual memory configuration //////
     float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
@@ -403,6 +412,7 @@ public class ContainersMonitorImpl exten
 
             boolean isMemoryOverLimit = false;
             String msg = "";
+            int containerExitStatus = ContainerExitStatus.INVALID;
             if (isVmemCheckEnabled()
                 && isProcessTreeOverLimit(containerId.toString(),
                     currentVmemUsage, curMemUsageOfAgedProcesses, vmemLimit)) {
@@ -414,6 +424,7 @@ public class ContainersMonitorImpl exten
                   currentPmemUsage, pmemLimit,
                   pId, containerId, pTree);
               isMemoryOverLimit = true;
+              containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_VMEM;
             } else if (isPmemCheckEnabled()
                 && isProcessTreeOverLimit(containerId.toString(),
                     currentPmemUsage, curRssMemUsageOfAgedProcesses,
@@ -426,6 +437,7 @@ public class ContainersMonitorImpl exten
                   currentPmemUsage, pmemLimit,
                   pId, containerId, pTree);
               isMemoryOverLimit = true;
+              containerExitStatus = ContainerExitStatus.KILLED_EXCEEDED_PMEM;
             }
 
             if (isMemoryOverLimit) {
@@ -440,7 +452,8 @@ public class ContainersMonitorImpl exten
               }
               // kill the container
               eventDispatcher.getEventHandler().handle(
-                  new ContainerKillEvent(containerId, msg));
+                  new ContainerKillEvent(containerId,
+                      containerExitStatus, msg));
               it.remove();
               LOG.info("Removed ProcessTree with root " + pId);
             } else {
@@ -513,6 +526,11 @@ public class ContainersMonitorImpl exten
     return this.maxPmemAllottedForContainers;
   }
 
+  @Override
+  public long getVCoresAllocatedForContainers() {
+    return this.maxVCoresAllottedForContainers;
+  }
+
   /**
    * Is the total virtual memory check enabled?
    *

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java Tue Aug 19 23:49:39 2014
@@ -40,6 +40,9 @@ public class NodeManagerMetrics {
   @Metric("Current # of allocated containers")
       MutableGaugeInt allocatedContainers;
   @Metric MutableGaugeInt availableGB;
+  @Metric("Current allocated Virtual Cores")
+      MutableGaugeInt allocatedVCores;
+  @Metric MutableGaugeInt availableVCores;
 
   public static NodeManagerMetrics create() {
     return create(DefaultMetricsSystem.instance());
@@ -88,16 +91,21 @@ public class NodeManagerMetrics {
     allocatedContainers.incr();
     allocatedGB.incr(res.getMemory() / 1024);
     availableGB.decr(res.getMemory() / 1024);
+    allocatedVCores.incr(res.getVirtualCores());
+    availableVCores.decr(res.getVirtualCores());
   }
 
   public void releaseContainer(Resource res) {
     allocatedContainers.decr();
     allocatedGB.decr(res.getMemory() / 1024);
     availableGB.incr(res.getMemory() / 1024);
+    allocatedVCores.decr(res.getVirtualCores());
+    availableVCores.incr(res.getVirtualCores());
   }
 
   public void addResource(Resource res) {
     availableGB.incr(res.getMemory() / 1024);
+    availableVCores.incr(res.getVirtualCores());
   }
   
   public int getRunningContainers() {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -33,6 +34,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerTokensState;
 import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
@@ -49,14 +53,74 @@ public class NMContainerTokenSecretManag
   
   private MasterKeyData previousMasterKey;
   private final TreeMap<Long, List<ContainerId>> recentlyStartedContainerTracker;
-
+  private final NMStateStoreService stateStore;
   
   private String nodeHostAddr;
   
   public NMContainerTokenSecretManager(Configuration conf) {
+    this(conf, new NMNullStateStoreService());
+  }
+
+  public NMContainerTokenSecretManager(Configuration conf,
+      NMStateStoreService stateStore) {
     super(conf);
     recentlyStartedContainerTracker =
         new TreeMap<Long, List<ContainerId>>();
+    this.stateStore = stateStore;
+  }
+
+  public synchronized void recover()
+      throws IOException {
+    RecoveredContainerTokensState state =
+        stateStore.loadContainerTokensState();
+    MasterKey key = state.getCurrentMasterKey();
+    if (key != null) {
+      super.currentMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    key = state.getPreviousMasterKey();
+    if (key != null) {
+      previousMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    // restore the serial number from the current master key
+    if (super.currentMasterKey != null) {
+      super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+    }
+
+    for (Entry<ContainerId, Long> entry : state.getActiveTokens().entrySet()) {
+      ContainerId containerId = entry.getKey();
+      Long expTime = entry.getValue();
+      List<ContainerId> containerList =
+          recentlyStartedContainerTracker.get(expTime);
+      if (containerList == null) {
+        containerList = new ArrayList<ContainerId>();
+        recentlyStartedContainerTracker.put(expTime, containerList);
+      }
+      if (!containerList.contains(containerId)) {
+        containerList.add(containerId);
+      }
+    }
+  }
+
+  private void updateCurrentMasterKey(MasterKeyData key) {
+    super.currentMasterKey = key;
+    try {
+      stateStore.storeContainerTokenCurrentMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update current master key in state store", e);
+    }
+  }
+
+  private void updatePreviousMasterKey(MasterKeyData key) {
+    previousMasterKey = key;
+    try {
+      stateStore.storeContainerTokenPreviousMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update previous master key in state store", e);
+    }
   }
 
   /**
@@ -68,21 +132,16 @@ public class NMContainerTokenSecretManag
    */
   @Private
   public synchronized void setMasterKey(MasterKey masterKeyRecord) {
-    LOG.info("Rolling master-key for container-tokens, got key with id "
-        + masterKeyRecord.getKeyId());
-    if (super.currentMasterKey == null) {
-      super.currentMasterKey =
-          new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
-            .getBytes().array()));
-    } else {
-      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
-          .getKeyId()) {
-        // Update keys only if the key has changed.
-        this.previousMasterKey = super.currentMasterKey;
-        super.currentMasterKey =
-            new MasterKeyData(masterKeyRecord, createSecretKey(masterKeyRecord
-              .getBytes().array()));
+    // Update keys only if the key has changed.
+    if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+          .getKeyId() != masterKeyRecord.getKeyId()) {
+      LOG.info("Rolling master-key for container-tokens, got key with id "
+          + masterKeyRecord.getKeyId());
+      if (super.currentMasterKey != null) {
+        updatePreviousMasterKey(super.currentMasterKey);
       }
+      updateCurrentMasterKey(new MasterKeyData(masterKeyRecord,
+          createSecretKey(masterKeyRecord.getBytes().array())));
     }
   }
 
@@ -137,14 +196,19 @@ public class NMContainerTokenSecretManag
 
     removeAnyContainerTokenIfExpired();
     
+    ContainerId containerId = tokenId.getContainerID();
     Long expTime = tokenId.getExpiryTimeStamp();
     // We might have multiple containers with same expiration time.
     if (!recentlyStartedContainerTracker.containsKey(expTime)) {
       recentlyStartedContainerTracker
         .put(expTime, new ArrayList<ContainerId>());
     }
-    recentlyStartedContainerTracker.get(expTime).add(tokenId.getContainerID());
-
+    recentlyStartedContainerTracker.get(expTime).add(containerId);
+    try {
+      stateStore.storeContainerToken(containerId, expTime);
+    } catch (IOException e) {
+      LOG.error("Unable to store token for container " + containerId, e);
+    }
   }
 
   protected synchronized void removeAnyContainerTokenIfExpired() {
@@ -155,6 +219,13 @@ public class NMContainerTokenSecretManag
     while (containersI.hasNext()) {
       Entry<Long, List<ContainerId>> containerEntry = containersI.next();
       if (containerEntry.getKey() < currTime) {
+        for (ContainerId container : containerEntry.getValue()) {
+          try {
+            stateStore.removeContainerToken(container);
+          } catch (IOException e) {
+            LOG.error("Unable to remove token for container " + container, e);
+          }
+        }
         containersI.remove();
       } else {
         break;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.security;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +32,9 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
@@ -45,16 +49,79 @@ public class NMTokenSecretManagerInNM ex
   
   private final Map<ApplicationAttemptId, MasterKeyData> oldMasterKeys;
   private final Map<ApplicationId, List<ApplicationAttemptId>> appToAppAttemptMap;
+  private final NMStateStoreService stateStore;
   private NodeId nodeId;                                                      
   
-  
   public NMTokenSecretManagerInNM() {
+    this(new NMNullStateStoreService());
+  }
+
+  public NMTokenSecretManagerInNM(NMStateStoreService stateStore) {
     this.oldMasterKeys =
         new HashMap<ApplicationAttemptId, MasterKeyData>();
     appToAppAttemptMap =         
         new HashMap<ApplicationId, List<ApplicationAttemptId>>();
+    this.stateStore = stateStore;
   }
   
+  public synchronized void recover()
+      throws IOException {
+    RecoveredNMTokensState state = stateStore.loadNMTokensState();
+    MasterKey key = state.getCurrentMasterKey();
+    if (key != null) {
+      super.currentMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    key = state.getPreviousMasterKey();
+    if (key != null) {
+      previousMasterKey =
+          new MasterKeyData(key, createSecretKey(key.getBytes().array()));
+    }
+
+    // restore the serial number from the current master key
+    if (super.currentMasterKey != null) {
+      super.serialNo = super.currentMasterKey.getMasterKey().getKeyId() + 1;
+    }
+
+    for (Map.Entry<ApplicationAttemptId, MasterKey> entry :
+         state.getApplicationMasterKeys().entrySet()) {
+      key = entry.getValue();
+      oldMasterKeys.put(entry.getKey(),
+          new MasterKeyData(key, createSecretKey(key.getBytes().array())));
+    }
+
+    // reconstruct app to app attempts map
+    appToAppAttemptMap.clear();
+    for (ApplicationAttemptId attempt : oldMasterKeys.keySet()) {
+      ApplicationId app = attempt.getApplicationId();
+      List<ApplicationAttemptId> attempts = appToAppAttemptMap.get(app);
+      if (attempts == null) {
+        attempts = new ArrayList<ApplicationAttemptId>();
+        appToAppAttemptMap.put(app, attempts);
+      }
+      attempts.add(attempt);
+    }
+  }
+
+  private void updateCurrentMasterKey(MasterKeyData key) {
+    super.currentMasterKey = key;
+    try {
+      stateStore.storeNMTokenCurrentMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update current master key in state store", e);
+    }
+  }
+
+  private void updatePreviousMasterKey(MasterKeyData key) {
+    previousMasterKey = key;
+    try {
+      stateStore.storeNMTokenPreviousMasterKey(key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to update previous master key in state store", e);
+    }
+  }
+
   /**
    * Used by NodeManagers to create a token-secret-manager with the key
    * obtained from the RM. This can happen during registration or when the RM
@@ -62,20 +129,16 @@ public class NMTokenSecretManagerInNM ex
    */
   @Private
   public synchronized void setMasterKey(MasterKey masterKey) {
-    LOG.info("Rolling master-key for nm-tokens, got key with id :"
-        + masterKey.getKeyId());
-    if (super.currentMasterKey == null) {
-      super.currentMasterKey =
-          new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-            .array()));
-    } else {
-      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKey
-        .getKeyId()) {
-        this.previousMasterKey = super.currentMasterKey;
-        super.currentMasterKey =
-            new MasterKeyData(masterKey, createSecretKey(masterKey.getBytes()
-              .array()));
+    // Update keys only if the key has changed.
+    if (super.currentMasterKey == null || super.currentMasterKey.getMasterKey()
+          .getKeyId() != masterKey.getKeyId()) {
+      LOG.info("Rolling master-key for container-tokens, got key with id "
+          + masterKey.getKeyId());
+      if (super.currentMasterKey != null) {
+        updatePreviousMasterKey(super.currentMasterKey);
       }
+      updateCurrentMasterKey(new MasterKeyData(masterKey,
+          createSecretKey(masterKey.getBytes().array())));
     }
   }
 
@@ -128,7 +191,7 @@ public class NMTokenSecretManagerInNM ex
       LOG.debug("Removing application attempts NMToken keys for application "
           + appId);
       for (ApplicationAttemptId appAttemptId : appAttemptList) {
-        this.oldMasterKeys.remove(appAttemptId);
+        removeAppAttemptKey(appAttemptId);
       }
       appToAppAttemptMap.remove(appId);
     } else {
@@ -164,11 +227,11 @@ public class NMTokenSecretManagerInNM ex
           + identifier.getApplicationAttemptId().toString());
       if (identifier.getKeyId() == currentMasterKey.getMasterKey()
         .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, currentMasterKey);
+        updateAppAttemptKey(appAttemptId, currentMasterKey);
       } else if (previousMasterKey != null
           && identifier.getKeyId() == previousMasterKey.getMasterKey()
             .getKeyId()) {
-        oldMasterKeys.put(appAttemptId, previousMasterKey);
+        updateAppAttemptKey(appAttemptId, previousMasterKey);
       } else {
         throw new InvalidToken(
           "Older NMToken should not be used while starting the container.");
@@ -193,4 +256,24 @@ public class NMTokenSecretManagerInNM ex
   public synchronized NodeId getNodeId() {
     return this.nodeId;
   }
+
+  private void updateAppAttemptKey(ApplicationAttemptId attempt,
+      MasterKeyData key) {
+    this.oldMasterKeys.put(attempt, key);
+    try {
+      stateStore.storeNMTokenApplicationMasterKey(attempt,
+          key.getMasterKey());
+    } catch (IOException e) {
+      LOG.error("Unable to store master key for application " + attempt, e);
+    }
+  }
+
+  private void removeAppAttemptKey(ApplicationAttemptId attempt) {
+    this.oldMasterKeys.remove(attempt);
+    try {
+      stateStore.removeNMTokenApplicationMasterKey(attempt);
+    } catch (IOException e) {
+      LOG.error("Unable to remove master key for application " + attempt, e);
+    }
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java Tue Aug 19 23:49:39 2014
@@ -85,6 +85,7 @@ public class ContainerPage extends NMVie
         ._("Diagnostics", info.getDiagnostics())
         ._("User", info.getUser())
         ._("TotalMemoryNeeded", info.getMemoryNeeded())
+        ._("TotalVCoresNeeded", info.getVCoresNeeded())
         ._("logs", info.getShortLogLink(), "Link to logs");
       html._(InfoBlock.class);
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java Tue Aug 19 23:49:39 2014
@@ -72,7 +72,9 @@ public class NodePage extends NMView {
           ._("Total Pmem allocated for Container",
               StringUtils.byteDesc(info.getTotalPmemAllocated() * BYTES_IN_MB))
           ._("Pmem enforcement enabled",
-              info.isVmemCheckEnabled())
+              info.isPmemCheckEnabled())
+           ._("Total VCores allocated for Containers",
+              String.valueOf(info.getTotalVCoresAllocated())) 
           ._("NodeHealthyStatus",
               info.getHealthStatus())
           ._("LastNodeHealthTime", new Date(

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Tue Aug 19 23:49:39 2014
@@ -55,7 +55,9 @@ public class WebServer extends AbstractS
 
   @Override
   protected void serviceStart() throws Exception {
-    String bindAddress = WebAppUtils.getNMWebAppURLWithoutScheme(getConfig());
+    String bindAddress = WebAppUtils.getWebAppBindURL(getConfig(),
+                          YarnConfiguration.NM_BIND_HOST,
+                          WebAppUtils.getNMWebAppURLWithoutScheme(getConfig()));
     
     LOG.info("Instantiating NMWebApp at " + bindAddress);
     try {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/dao/ContainerInfo.java Tue Aug 19 23:49:39 2014
@@ -42,6 +42,7 @@ public class ContainerInfo {
   protected String diagnostics;
   protected String user;
   protected long totalMemoryNeededMB;
+  protected long totalVCoresNeeded;
   protected String containerLogsLink;
   protected String nodeId;
   @XmlTransient
@@ -76,6 +77,7 @@ public class ContainerInfo {
     Resource res = container.getResource();
     if (res != null) {
       this.totalMemoryNeededMB = res.getMemory();
+      this.totalVCoresNeeded = res.getVirtualCores();
     }
     this.containerLogsShortLink = ujoin("containerlogs", this.id,
         container.getUser());
@@ -130,4 +132,8 @@ public class ContainerInfo {
     return this.totalMemoryNeededMB;
   }
 
+  public long getVCoresNeeded() {
+    return this.totalVCoresNeeded;
+  }
+
 }