You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC

svn commit: r1196458 [12/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cl...

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed Nov  2 05:34:31 2011
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.StringUtils;
@@ -55,7 +56,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationCleanupEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -77,6 +78,9 @@ public class ContainerImpl implements Co
   private int exitCode = YarnConfiguration.INVALID_CONTAINER_EXIT_STATUS;
   private final StringBuilder diagnostics;
 
+  /** The NM-wide configuration - not specific to this container */
+  private final Configuration daemonConf;
+
   private static final Log LOG = LogFactory.getLog(Container.class);
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final Map<LocalResourceRequest,String> pendingResources =
@@ -90,9 +94,11 @@ public class ContainerImpl implements Co
   private final List<LocalResourceRequest> appRsrcs =
     new ArrayList<LocalResourceRequest>();
 
-  public ContainerImpl(Dispatcher dispatcher,
+  public ContainerImpl(Configuration conf,
+      Dispatcher dispatcher,
       ContainerLaunchContext launchContext, Credentials creds,
       NodeManagerMetrics metrics) {
+    this.daemonConf = conf;
     this.dispatcher = dispatcher;
     this.launchContext = launchContext;
     this.diagnostics = new StringBuilder();
@@ -152,6 +158,19 @@ public class ContainerImpl implements Co
         ContainerState.LOCALIZATION_FAILED,
         ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
         UPDATE_DIAGNOSTICS_TRANSITION)
+    // container not launched so kill is a no-op
+    .addTransition(ContainerState.LOCALIZATION_FAILED,
+        ContainerState.LOCALIZATION_FAILED,
+        ContainerEventType.KILL_CONTAINER)
+    // container cleanup triggers a release of all resources
+    // regardless of whether they were localized or not
+    // LocalizedResource handles release event in all states
+    .addTransition(ContainerState.LOCALIZATION_FAILED,
+        ContainerState.LOCALIZATION_FAILED,
+        ContainerEventType.RESOURCE_LOCALIZED)
+    .addTransition(ContainerState.LOCALIZATION_FAILED,
+        ContainerState.LOCALIZATION_FAILED,
+        ContainerEventType.RESOURCE_FAILED)
 
     // From LOCALIZED State
     .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
@@ -162,8 +181,6 @@ public class ContainerImpl implements Co
     .addTransition(ContainerState.LOCALIZED, ContainerState.LOCALIZED,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
-       // TODO race: Can lead to a CONTAINER_LAUNCHED event at state KILLING, 
-       // and a container which will never be killed by the NM.
     .addTransition(ContainerState.LOCALIZED, ContainerState.KILLING,
         ContainerEventType.KILL_CONTAINER, new KillTransition())
 
@@ -218,6 +235,9 @@ public class ContainerImpl implements Co
         ContainerState.KILLING,
         ContainerEventType.RESOURCE_LOCALIZED,
         new LocalizedResourceDuringKillTransition())
+    .addTransition(ContainerState.KILLING, 
+        ContainerState.KILLING, 
+        ContainerEventType.RESOURCE_FAILED)
     .addTransition(ContainerState.KILLING, ContainerState.KILLING,
        ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
        UPDATE_DIAGNOSTICS_TRANSITION)
@@ -233,6 +253,12 @@ public class ContainerImpl implements Co
             ContainerState.DONE,
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP,
             CONTAINER_DONE_TRANSITION)
+    // Handle a launched container during killing stage is a no-op
+    // as cleanup container is always handled after launch container event
+    // in the container launcher
+    .addTransition(ContainerState.KILLING,
+        ContainerState.KILLING,
+        ContainerEventType.CONTAINER_LAUNCHED)
 
     // From CONTAINER_CLEANEDUP_AFTER_KILL State.
     .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@@ -399,7 +425,7 @@ public class ContainerImpl implements Co
     // Remove the container from the resource-monitor
     eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
     // Tell the logService too
-    eventHandler.handle(new LogAggregatorContainerFinishedEvent(
+    eventHandler.handle(new LogHandlerContainerFinishedEvent(
         containerID, exitCode));
   }
 
@@ -431,6 +457,20 @@ public class ContainerImpl implements Co
 
   }
 
+  /**
+   * State transition when a NEW container receives the INIT_CONTAINER
+   * message.
+   * 
+   * If there are resources to localize, sends a
+   * ContainerLocalizationRequest (INIT_CONTAINER_RESOURCES) 
+   * to the ResourceLocalizationManager and enters LOCALIZING state.
+   * 
+   * If there are no resources to localize, sends LAUNCH_CONTAINER event
+   * and enters LOCALIZED state directly.
+   * 
+   * If there are any invalid resources specified, enters LOCALIZATION_FAILED
+   * directly.
+   */
   @SuppressWarnings("unchecked") // dispatcher not typed
   static class RequestResourcesTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@@ -513,6 +553,10 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transition when one of the requested resources for this container
+   * has been successfully localized.
+   */
   @SuppressWarnings("unchecked") // dispatcher not typed
   static class LocalizedTransition implements
       MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@@ -540,22 +584,34 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transition from LOCALIZED state to RUNNING state upon receiving
+   * a CONTAINER_LAUNCHED event
+   */
   @SuppressWarnings("unchecked") // dispatcher not typed
   static class LaunchTransition extends ContainerTransition {
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       // Inform the ContainersMonitor to start monitoring the container's
       // resource usage.
-      // TODO: Fix pmem limits below
-      long vmemBytes =
+      long pmemBytes =
           container.getLaunchContext().getResource().getMemory() * 1024 * 1024L;
+      float pmemRatio = container.daemonConf.getFloat(
+          YarnConfiguration.NM_VMEM_PMEM_RATIO,
+          YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
+      long vmemBytes = (long) (pmemRatio * pmemBytes);
+      
       container.dispatcher.getEventHandler().handle(
           new ContainerStartMonitoringEvent(container.getContainerID(),
-              vmemBytes, -1));
+              vmemBytes, pmemBytes));
       container.metrics.runningContainer();
     }
   }
 
+  /**
+   * Transition from RUNNING or KILLING state to EXITED_WITH_SUCCESS state
+   * upon EXITED_WITH_SUCCESS message.
+   */
   @SuppressWarnings("unchecked")  // dispatcher not typed
   static class ExitedWithSuccessTransition extends ContainerTransition {
 
@@ -582,6 +638,10 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transition to EXITED_WITH_FAILURE state upon
+   * CONTAINER_EXITED_WITH_FAILURE state.
+   **/
   @SuppressWarnings("unchecked")  // dispatcher not typed
   static class ExitedWithFailureTransition extends ContainerTransition {
 
@@ -609,6 +669,9 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transition to EXITED_WITH_FAILURE upon receiving KILLED_ON_REQUEST
+   */
   static class KilledExternallyTransition extends ExitedWithFailureTransition {
     KilledExternallyTransition() {
       super(true);
@@ -621,6 +684,10 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transition from LOCALIZING to LOCALIZATION_FAILED upon receiving
+   * RESOURCE_FAILED event.
+   */
   static class ResourceFailedTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -638,7 +705,11 @@ public class ContainerImpl implements Co
       container.metrics.endInitingContainer();
     }
   }
-  
+
+  /**
+   * Transition from LOCALIZING to KILLING upon receiving
+   * KILL_CONTAINER event.
+   */
   static class KillDuringLocalizationTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -652,6 +723,10 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Remain in KILLING state when receiving a RESOURCE_LOCALIZED request
+   * while in the process of killing.
+   */
   static class LocalizedResourceDuringKillTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -669,6 +744,11 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transitions upon receiving KILL_CONTAINER:
+   * - LOCALIZED -> KILLING
+   * - RUNNING -> KILLING
+   */
   @SuppressWarnings("unchecked") // dispatcher not typed
   static class KillTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
@@ -683,6 +763,10 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
+   * upon receiving CONTAINER_KILLED_ON_REQUEST.
+   */
   static class ContainerKilledTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -696,6 +780,13 @@ public class ContainerImpl implements Co
     }
   }
 
+  /**
+   * Handle the following transitions:
+   * - NEW -> DONE upon KILL_CONTAINER
+   * - {LOCALIZATION_FAILED, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE,
+   *    KILLING, CONTAINER_CLEANEDUP_AFTER_KILL}
+   *   -> DONE upon CONTAINER_RESOURCES_CLEANEDUP
+   */
   static class ContainerDoneTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override
@@ -703,7 +794,10 @@ public class ContainerImpl implements Co
       container.finished();
     }
   }
-  
+
+  /**
+   * Update diagnostics, staying in the same state.
+   */
   static class ContainerDiagnosticsUpdateTransition implements
       SingleArcTransition<ContainerImpl, ContainerEvent> {
     @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Nov  2 05:34:31 2011
@@ -27,11 +27,11 @@ import java.io.OutputStream;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,11 +45,14 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -57,21 +60,33 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
+import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ContainerLaunch implements Callable<Integer> {
 
   private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
 
-  public static final String CONTAINER_SCRIPT = "task.sh";
+  public static final String CONTAINER_SCRIPT = "launch_container.sh";
   public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
 
+  private static final String PID_FILE_NAME_FMT = "%s.pid";
+
   private final Dispatcher dispatcher;
   private final ContainerExecutor exec;
   private final Application app;
   private final Container container;
   private final Configuration conf;
   private final LocalDirAllocator logDirsSelector;
+  
+  private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
+  private volatile AtomicBoolean completed = new AtomicBoolean(false);
+
+  private long sleepDelayBeforeSigKill = 250;
+  private long maxKillWaitTime = 2000;
+
+  private Path pidFilePath = null;
 
   public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
       ContainerExecutor exec, Application app, Container container) {
@@ -81,6 +96,12 @@ public class ContainerLaunch implements 
     this.container = container;
     this.dispatcher = dispatcher;
     this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
+    this.sleepDelayBeforeSigKill =
+        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
+    this.maxKillWaitTime =
+        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
+            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS);
   }
 
   @Override
@@ -88,7 +109,8 @@ public class ContainerLaunch implements 
   public Integer call() {
     final ContainerLaunchContext launchContext = container.getLaunchContext();
     final Map<Path,String> localResources = container.getLocalizedResources();
-    String containerIdStr = ConverterUtils.toString(container.getContainerID());
+    ContainerId containerID = container.getContainerID();
+    String containerIdStr = ConverterUtils.toString(containerID);
     final String user = launchContext.getUser();
     final List<String> command = launchContext.getCommands();
     int ret = -1;
@@ -97,11 +119,11 @@ public class ContainerLaunch implements 
       // /////////////////////////// Variable expansion
       // Before the container script gets written out.
       List<String> newCmds = new ArrayList<String>(command.size());
-      String appIdStr = app.toString();
+      String appIdStr = app.getAppId().toString();
       Path containerLogDir =
-          this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
-              + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf, 
-              false);
+          this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
+              .getRelativeContainerLogDir(appIdStr, containerIdStr),
+              LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
       for (String str : command) {
         // TODO: Should we instead work via symlinks without this grammar?
         newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@@ -124,19 +146,18 @@ public class ContainerLaunch implements 
       FileContext lfs = FileContext.getLocalFSFileContext();
       LocalDirAllocator lDirAllocator =
           new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
+
       Path nmPrivateContainerScriptPath =
           lDirAllocator.getLocalPathForWrite(
-              ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
-                  + appIdStr + Path.SEPARATOR + containerIdStr
-                  + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
+              getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
+                  + CONTAINER_SCRIPT, this.conf);
       Path nmPrivateTokensPath =
           lDirAllocator.getLocalPathForWrite(
-              ResourceLocalizationService.NM_PRIVATE_DIR
-                  + Path.SEPARATOR
-                  + containerIdStr
+              getContainerPrivateDir(appIdStr, containerIdStr)
                   + Path.SEPARATOR
                   + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
                       containerIdStr), this.conf);
+
       DataOutputStream containerScriptOutStream = null;
       DataOutputStream tokensOutStream = null;
 
@@ -147,6 +168,17 @@ public class ContainerLaunch implements 
               + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
               + Path.SEPARATOR + containerIdStr,
               LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
+
+      String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
+          containerIdStr);
+
+      // pid file should be in nm private dir so that it is not 
+      // accessible by users
+      pidFilePath = lDirAllocator.getLocalPathForWrite(
+          ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR 
+          + pidFileSuffix,
+          this.conf);
+
       try {
         // /////////// Write out the container-script in the nmPrivate space.
         String[] localDirs =
@@ -191,21 +223,36 @@ public class ContainerLaunch implements 
       // LaunchContainer is a blocking call. We are here almost means the
       // container is launched, so send out the event.
       dispatcher.getEventHandler().handle(new ContainerEvent(
-            container.getContainerID(),
+            containerID,
             ContainerEventType.CONTAINER_LAUNCHED));
 
-      ret =
-          exec.launchContainer(container, nmPrivateContainerScriptPath,
-              nmPrivateTokensPath, user, appIdStr, containerWorkDir);
+      // Check if the container is signalled to be killed.
+      if (!shouldLaunchContainer.compareAndSet(false, true)) {
+        LOG.info("Container " + containerIdStr + " not launched as "
+            + "cleanup already called");
+        ret = ExitCode.TERMINATED.getExitCode();
+      }
+      else {
+        exec.activateContainer(containerID, pidFilePath);
+        ret =
+            exec.launchContainer(container, nmPrivateContainerScriptPath,
+                nmPrivateTokensPath, user, appIdStr, containerWorkDir);
+      }
     } catch (Throwable e) {
       LOG.warn("Failed to launch container", e);
       dispatcher.getEventHandler().handle(new ContainerExitEvent(
             launchContext.getContainerId(),
             ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
       return ret;
+    } finally {
+      completed.set(true);
+      exec.deactivateContainer(containerID);
     }
 
-    if (ret == ExitCode.KILLED.getExitCode()) {
+    LOG.debug("Container " + containerIdStr + " completed with exit code "
+        + ret);
+    if (ret == ExitCode.FORCE_KILLED.getExitCode()
+        || ret == ExitCode.TERMINATED.getExitCode()) {
       // If the process was killed, Send container_cleanedup_after_kill and
       // just break out of this method.
       dispatcher.getEventHandler().handle(
@@ -228,6 +275,129 @@ public class ContainerLaunch implements 
             ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
     return 0;
   }
+  
+  /**
+   * Cleanup the container.
+   * Cancels the launch if launch has not started yet or signals
+   * the executor to not execute the process if not already done so.
+   * Also, sends a SIGTERM followed by a SIGKILL to the process if
+   * the process id is available.
+   * @throws IOException
+   */
+  public void cleanupContainer() throws IOException {
+    ContainerId containerId = container.getContainerID();
+    String containerIdStr = ConverterUtils.toString(containerId);
+    LOG.info("Cleaning up container " + containerIdStr);
+
+    // launch flag will be set to true if process already launched
+    boolean alreadyLaunched = !shouldLaunchContainer.compareAndSet(false, true);
+    if (!alreadyLaunched) {
+      LOG.info("Container " + containerIdStr + " not launched."
+          + " No cleanup needed to be done");
+      return;
+    }
+
+    LOG.debug("Marking container " + containerIdStr + " as inactive");
+    // this should ensure that if the container process has not launched 
+    // by this time, it will never be launched
+    exec.deactivateContainer(containerId);
+
+    LOG.debug("Getting pid for container " + containerIdStr + " to kill"
+        + " from pid file " 
+        + (pidFilePath != null ? pidFilePath.toString() : "null"));
+
+    // however the container process may have already started
+    try {
+
+      // get process id from pid file if available
+      // else if shell is still active, get it from the shell
+      String processId = null;
+      if (pidFilePath != null) {
+        processId = getContainerPid(pidFilePath);
+      }
+
+      // kill process
+      if (processId != null) {
+        String user = container.getLaunchContext().getUser();
+        LOG.debug("Sending signal to pid " + processId
+            + " as user " + user
+            + " for container " + containerIdStr);
+        if (sleepDelayBeforeSigKill > 0) {
+          boolean result = exec.signalContainer(user,
+              processId, Signal.TERM);
+          LOG.debug("Sent signal to pid " + processId
+              + " as user " + user
+              + " for container " + containerIdStr
+              + ", result=" + (result? "success" : "failed"));
+          new DelayedProcessKiller(user,
+              processId, sleepDelayBeforeSigKill, Signal.KILL, exec).start();
+        }
+      }
+    } catch (Exception e) {
+      LOG.warn("Got error when trying to cleanup container " + containerIdStr
+          + ", error=" + e.getMessage());
+    } finally {
+      // cleanup pid file if present
+      if (pidFilePath != null) {
+        FileContext lfs = FileContext.getLocalFSFileContext();
+        lfs.delete(pidFilePath, false);
+      }
+    }
+  }
+
+  /**
+   * Loop through for a time-bounded interval waiting to
+   * read the process id from a file generated by a running process.
+   * @param pidFilePath File from which to read the process id
+   * @return Process ID
+   * @throws Exception
+   */
+  private String getContainerPid(Path pidFilePath) throws Exception {
+    String containerIdStr = 
+        ConverterUtils.toString(container.getContainerID());
+    String processId = null;
+    LOG.debug("Accessing pid for container " + containerIdStr
+        + " from pid file " + pidFilePath);
+    int sleepCounter = 0;
+    final int sleepInterval = 100;
+
+    // loop waiting for pid file to show up 
+    // until either the completed flag is set which means something bad 
+    // happened or our timer expires in which case we admit defeat
+    while (!completed.get()) {
+      processId = ProcessIdFileReader.getProcessId(pidFilePath);
+      if (processId != null) {
+        LOG.debug("Got pid " + processId + " for container "
+            + containerIdStr);
+        break;
+      }
+      else if ((sleepCounter*sleepInterval) > maxKillWaitTime) {
+        LOG.info("Could not get pid for " + containerIdStr
+        		+ ". Waited for " + maxKillWaitTime + " ms.");
+        break;
+      }
+      else {
+        ++sleepCounter;
+        Thread.sleep(sleepInterval);
+      }
+    }
+    return processId;
+  }
+
+  public static String getRelativeContainerLogDir(String appIdStr,
+      String containerIdStr) {
+    return appIdStr + Path.SEPARATOR + containerIdStr;
+  }
+
+  private String getContainerPrivateDir(String appIdStr, String containerIdStr) {
+    return getAppPrivateDir(appIdStr) + Path.SEPARATOR + containerIdStr
+        + Path.SEPARATOR;
+  }
+
+  private String getAppPrivateDir(String appIdStr) {
+    return ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+        + appIdStr;
+  }
 
   private static class ShellScriptBuilder {
     
@@ -260,7 +430,7 @@ public class ContainerLaunch implements 
       if (dst.toUri().getPath().indexOf('/') != -1) {
         line("mkdir -p ", dst.getParent().toString());
       }
-      line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
+      line("ln -sf \"", src.toUri().getPath(), "\" \"", dst.toString(), "\"");
       return this;
     }
   
@@ -279,7 +449,7 @@ public class ContainerLaunch implements 
     public String toString() {
       return sb.toString();
     }
-  
+
   }
 
   private static void putEnvIfNotNull(
@@ -301,7 +471,7 @@ public class ContainerLaunch implements 
     /**
      * Non-modifiable environment variables
      */
-    
+
     putEnvIfNotNull(environment, Environment.USER.name(), container.getUser());
     
     putEnvIfNotNull(environment, 
@@ -335,14 +505,23 @@ public class ContainerLaunch implements 
      * Modifiable environment variables
      */
     
-    putEnvIfAbsent(environment, Environment.JAVA_HOME.name());
-    putEnvIfAbsent(environment, Environment.HADOOP_COMMON_HOME.name());
-    putEnvIfAbsent(environment, Environment.HADOOP_HDFS_HOME.name());
-    putEnvIfAbsent(environment, Environment.YARN_HOME.name());
+    // allow containers to override these variables
+    String[] whitelist = conf.get(YarnConfiguration.NM_ENV_WHITELIST, YarnConfiguration.DEFAULT_NM_ENV_WHITELIST).split(",");
+    
+    for(String whitelistEnvVariable : whitelist) {
+      putEnvIfAbsent(environment, whitelistEnvVariable.trim());
+    }
 
+    // variables here will be forced in, even if the container has specified them.
+    Apps.setEnvFromInputString(
+      environment,
+      conf.get(
+        YarnConfiguration.NM_ADMIN_USER_ENV,
+        YarnConfiguration.DEFAULT_NM_ADMIN_USER_ENV)
+    );
   }
-  
-  private static void writeLaunchEnv(OutputStream out,
+    
+  static void writeLaunchEnv(OutputStream out,
       Map<String,String> environment, Map<Path,String> resources,
       List<String> command)
       throws IOException {
@@ -357,9 +536,9 @@ public class ContainerLaunch implements 
         sb.symlink(link.getKey(), link.getValue());
       }
     }
+
     ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
-    cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
-    cmd.add("/bin/bash ");
+    cmd.add("exec /bin/bash ");
     cmd.add("-c ");
     cmd.add("\"");
     for (String cs : command) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Nov  2 05:34:31 2011
@@ -26,21 +26,25 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * The launcher for the containers. This service should be started only after
  * the {@link ResourceLocalizationService} is started as it depends on creation
@@ -50,22 +54,28 @@ import org.apache.hadoop.yarn.service.Ab
 public class ContainersLauncher extends AbstractService
     implements EventHandler<ContainersLauncherEvent> {
 
+  private static final Log LOG = LogFactory.getLog(ContainersLauncher.class);
+
   private final Context context;
   private final ContainerExecutor exec;
   private final Dispatcher dispatcher;
   private final ExecutorService containerLauncher =
-    Executors.newCachedThreadPool();
+    Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder()
+          .setNameFormat("ContainersLauncher #%d")
+          .build());
   private final Map<ContainerId,RunningContainer> running =
     Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());
 
   private static final class RunningContainer {
-    public RunningContainer(String string, Future<Integer> submit) {
-      this.user = string;
+    public RunningContainer(Future<Integer> submit,
+        ContainerLaunch launcher) {
       this.runningcontainer = submit;
+      this.launcher = launcher;
     }
 
-    String user;
     Future<Integer> runningcontainer;
+    ContainerLaunch launcher;
   }
 
 
@@ -99,7 +109,6 @@ public class ContainersLauncher extends 
     // TODO: ContainersLauncher launches containers one by one!!
     Container container = event.getContainer();
     ContainerId containerId = container.getContainerID();
-    String userName = container.getUser();
     switch (event.getType()) {
       case LAUNCH_CONTAINER:
         Application app =
@@ -109,33 +118,26 @@ public class ContainersLauncher extends 
           new ContainerLaunch(getConfig(), dispatcher, exec, app,
               event.getContainer());
         running.put(containerId,
-            new RunningContainer(userName,
-                containerLauncher.submit(launch)));
+            new RunningContainer(containerLauncher.submit(launch), 
+                launch));
         break;
       case CLEANUP_CONTAINER:
         RunningContainer rContainerDatum = running.remove(containerId);
         Future<Integer> rContainer = rContainerDatum.runningcontainer;
-        if (rContainer != null) {
-  
-          if (rContainer.isDone()) {
-            // The future is already done by this time.
-            break;
-          }
-  
-          // Cancel the future so that it won't be launched if it isn't already.
+        if (rContainer != null 
+            && !rContainer.isDone()) {
+          // Cancel the future so that it won't be launched 
+          // if it isn't already.
           rContainer.cancel(false);
-  
-          // Kill the container
-          String processId = exec.getProcessId(containerId);
-          if (processId != null) {
-            try {
-              exec.signalContainer(rContainerDatum.user,
-                  processId, Signal.KILL);
-            } catch (IOException e) {
-              // TODO Auto-generated catch block
-              e.printStackTrace();
-            }
-          }
+        }
+
+        // Cleanup a container whether it is running/killed/completed, so that
+        // no sub-processes are alive.
+        try {
+          rContainerDatum.launcher.cleanupContainer();
+        } catch (IOException e) {
+          LOG.warn("Got exception while cleaning container " + containerId
+              + ". Ignoring.");
         }
         break;
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Wed Nov  2 05:34:31 2011
@@ -65,6 +65,9 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ContainerLocalizer {
 
@@ -178,7 +181,8 @@ public class ContainerLocalizer {
   }
 
   ExecutorService createDownloadThreadPool() {
-    return Executors.newSingleThreadExecutor();
+    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+      .setNameFormat("ContainerLocalizer Downloader").build());
   }
 
   Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourceRequest.java Wed Nov  2 05:34:31 2011
@@ -33,6 +33,7 @@ public class LocalResourceRequest
   private final Path loc;
   private final long timestamp;
   private final LocalResourceType type;
+  private final LocalResourceVisibility visibility;
 
   /**
    * Wrap API resource to match against cache of localized resources.
@@ -43,13 +44,16 @@ public class LocalResourceRequest
       throws URISyntaxException {
     this(ConverterUtils.getPathFromYarnURL(resource.getResource()),
         resource.getTimestamp(),
-        resource.getType());
+        resource.getType(),
+        resource.getVisibility());
   }
 
-  LocalResourceRequest(Path loc, long timestamp, LocalResourceType type) {
+  LocalResourceRequest(Path loc, long timestamp, LocalResourceType type,
+      LocalResourceVisibility visibility) {
     this.loc = loc;
     this.timestamp = timestamp;
     this.type = type;
+    this.visibility = visibility;
   }
 
   @Override
@@ -114,7 +118,7 @@ public class LocalResourceRequest
 
   @Override
   public LocalResourceVisibility getVisibility() {
-    throw new UnsupportedOperationException();
+    return visibility;
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Wed Nov  2 05:34:31 2011
@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.no
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -37,6 +40,9 @@ import org.apache.hadoop.yarn.server.nod
 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
+  private static final String RANDOM_DIR_REGEX = "-?\\d+";
+  private static final Pattern RANDOM_DIR_PATTERN = Pattern
+      .compile(RANDOM_DIR_REGEX);
 
   private final String user;
   private final Dispatcher dispatcher;
@@ -83,28 +89,44 @@ class LocalResourcesTrackerImpl implemen
 
   @Override
   public boolean remove(LocalizedResource rem, DeletionService delService) {
-    // current synchronization guaranteed by crude RLS event for cleanup
+ // current synchronization guaranteed by crude RLS event for cleanup
     LocalizedResource rsrc = localrsrc.get(rem.getRequest());
     if (null == rsrc) {
-      LOG.error("Attempt to remove absent resource: " + rem.getRequest() +
-          " from " + getUser());
+      LOG.error("Attempt to remove absent resource: " + rem.getRequest()
+          + " from " + getUser());
       return true;
     }
     if (rsrc.getRefCount() > 0
-        || ResourceState.DOWNLOADING.equals(rsrc.getState())
-        || rsrc != rem) {
+        || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
       // internal error
-      LOG.error("Attempt to remove resource: " + rsrc + " with non-zero refcount");
+      LOG.error("Attempt to remove resource: " + rsrc
+          + " with non-zero refcount");
       assert false;
       return false;
     }
-    localrsrc.remove(rem.getRequest());
     if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
-      delService.delete(getUser(), rsrc.getLocalPath());
+      delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
     }
     return true;
   }
 
+
+  /**
+   * Returns the path up to the random directory component.
+   */
+  private Path getPathToDelete(Path localPath) {
+    Path delPath = localPath.getParent();
+    String name = delPath.getName();
+    Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
+    if (matcher.matches()) {
+      return delPath;
+    } else {
+      LOG.warn("Random directory component did not match. " +
+      		"Deleting localized path only");
+      return localPath;
+    }
+  }
+
   @Override
   public String getUser() {
     return user;
@@ -114,5 +136,4 @@ class LocalResourcesTrackerImpl implemen
   public Iterator<LocalizedResource> iterator() {
     return localrsrc.values().iterator();
   }
-
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Wed Nov  2 05:34:31 2011
@@ -205,6 +205,11 @@ public class LocalizedResource implement
     // typedef
   }
 
+  /**
+   * Transition from INIT to DOWNLOADING.
+   * Sends a {@link LocalizerResourceRequestEvent} to the
+   * {@link ResourceLocalizationService}.
+   */
   @SuppressWarnings("unchecked") // dispatcher not typed
   private static class FetchResourceTransition extends ResourceTransition {
     @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Nov  2 05:34:31 2011
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.fs.FileUtil;
@@ -61,16 +62,16 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -104,11 +105,14 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.FSDownload;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ResourceLocalizationService extends CompositeService
     implements EventHandler<LocalizationEvent>, LocalizationProtocol {
@@ -133,8 +137,18 @@ public class ResourceLocalizationService
   private final ScheduledExecutorService cacheCleanup;
 
   private final LocalResourcesTracker publicRsrc;
+  
+  /**
+   * Map of LocalResourceTrackers keyed by username, for private
+   * resources.
+   */
   private final ConcurrentMap<String,LocalResourcesTracker> privateRsrc =
     new ConcurrentHashMap<String,LocalResourcesTracker>();
+
+  /**
+   * Map of LocalResourceTrackers keyed by appid, for application
+   * resources.
+   */
   private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
     new ConcurrentHashMap<String,LocalResourcesTracker>();
 
@@ -146,7 +160,10 @@ public class ResourceLocalizationService
     this.delService = delService;
     this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
     this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
-    this.cacheCleanup = new ScheduledThreadPoolExecutor(1);
+    this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
+        new ThreadFactoryBuilder()
+          .setNameFormat("ResourceLocalizationService Cache Cleanup")
+          .build());
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -200,7 +217,9 @@ public class ResourceLocalizationService
     cacheCleanupPeriod =
       conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS);
     localizationServerAddress = NetUtils.createSocketAddr(
-      conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+      conf.get(YarnConfiguration.NM_LOCALIZER_ADDRESS, YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS),
+      YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT,
+      YarnConfiguration.NM_LOCALIZER_ADDRESS);
     localizerTracker = createLocalizerTracker(conf);
     addService(localizerTracker);
     dispatcher.register(LocalizerEventType.class, localizerTracker);
@@ -217,8 +236,15 @@ public class ResourceLocalizationService
     cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
         cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     server = createServer();
-    LOG.info("Localizer started on port " + server.getPort());
     server.start();
+    String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
+        .split(":")[0];
+    getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":" 
+        + server.getPort());
+    localizationServerAddress = NetUtils.createSocketAddr(
+        getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS, 
+            YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+    LOG.info("Localizer started on port " + server.getPort());
     super.start();
   }
 
@@ -234,155 +260,197 @@ public class ResourceLocalizationService
       secretManager = new LocalizerTokenSecretManager();
     }
     
-    return rpc.getServer(LocalizationProtocol.class, this,
+    Server server = rpc.getServer(LocalizationProtocol.class, this,
         localizationServerAddress, conf, secretManager, 
         conf.getInt(YarnConfiguration.NM_LOCALIZER_CLIENT_THREAD_COUNT, 
             YarnConfiguration.DEFAULT_NM_LOCALIZER_CLIENT_THREAD_COUNT));
-
+    
+    // Enable service authorization?
+    if (conf.getBoolean(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+        false)) {
+      server.refreshServiceAcl(conf, new NMPolicyProvider());
+    }
+    
+    return server;
   }
 
   @Override
   public void stop() {
     if (server != null) {
-      server.close();
+      server.stop();
     }
     cacheCleanup.shutdown();
     super.stop();
   }
 
   @Override
-  @SuppressWarnings("unchecked") // dispatcher not typed
   public void handle(LocalizationEvent event) {
-    String userName;
-    String appIDStr;
-    Container c;
-    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs;
-    LocalResourcesTracker tracker;
     // TODO: create log dir as $logdir/$user/$appId
     switch (event.getType()) {
     case INIT_APPLICATION_RESOURCES:
-      Application app =
-        ((ApplicationLocalizationEvent)event).getApplication();
-      // 0) Create application tracking structs
-      userName = app.getUser();
-      privateRsrc.putIfAbsent(userName,
-          new LocalResourcesTrackerImpl(userName, dispatcher));
-      if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
-          new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
-        LOG.warn("Initializing application " + app + " already present");
-        assert false; // TODO: FIXME assert doesn't help
-                      // ^ The condition is benign. Tests should fail and it
-                      //   should appear in logs, but it's an internal error
-                      //   that should have no effect on applications
-      }
-      // 1) Signal container init
-      dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
-            app.getAppId()));
+      handleInitApplicationResources(
+          ((ApplicationLocalizationEvent)event).getApplication());
       break;
     case INIT_CONTAINER_RESOURCES:
-      ContainerLocalizationRequestEvent rsrcReqs =
-        (ContainerLocalizationRequestEvent) event;
-      c = rsrcReqs.getContainer();
-      LocalizerContext ctxt = new LocalizerContext(
-          c.getUser(), c.getContainerID(), c.getCredentials());
-      rsrcs = rsrcReqs.getRequestedResources();
-      for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
-           rsrcs.entrySet()) {
-        tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
-            c.getContainerID().getApplicationAttemptId().getApplicationId());
-        for (LocalResourceRequest req : e.getValue()) {
-          tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
-        }
-      }
+      handleInitContainerResources((ContainerLocalizationRequestEvent) event);
       break;
     case CACHE_CLEANUP:
-      ResourceRetentionSet retain =
-        new ResourceRetentionSet(delService, cacheTargetSize);
-      retain.addResources(publicRsrc);
-      LOG.debug("Resource cleanup (public) " + retain);
-      for (LocalResourcesTracker t : privateRsrc.values()) {
-        retain.addResources(t);
-        LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
-      }
+      handleCacheCleanup(event);
       break;
     case CLEANUP_CONTAINER_RESOURCES:
-      ContainerLocalizationCleanupEvent rsrcCleanup =
-        (ContainerLocalizationCleanupEvent) event;
-      c = rsrcCleanup.getContainer();
-      rsrcs = rsrcCleanup.getResources();
-      for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
-           rsrcs.entrySet()) {
-        tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
-            c.getContainerID().getApplicationAttemptId().getApplicationId());
-        for (LocalResourceRequest req : e.getValue()) {
-          tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
-        }
-      }
-
-      // Delete the container directories
-      userName = c.getUser();
-      String containerIDStr = c.toString();
-      appIDStr =
-        ConverterUtils.toString(
-            c.getContainerID().getApplicationAttemptId().getApplicationId());
-      for (Path localDir : localDirs) {
-
-        // Delete the user-owned container-dir
-        Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
-        Path userdir = new Path(usersdir, userName);
-        Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
-        Path appDir = new Path(allAppsdir, appIDStr);
-        Path containerDir = new Path(appDir, containerIDStr);
-        delService.delete(userName, containerDir, new Path[] {});
-
-        // Delete the nmPrivate container-dir
-        Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
-        Path appSysDir = new Path(sysDir, appIDStr);
-        Path containerSysDir = new Path(appSysDir, containerIDStr);
-        delService.delete(null, containerSysDir,  new Path[] {});
-      }
-
-      dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
-            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+      handleCleanupContainerResources((ContainerLocalizationCleanupEvent)event);
       break;
     case DESTROY_APPLICATION_RESOURCES:
-
-      Application application =
-          ((ApplicationLocalizationEvent) event).getApplication();
-      LocalResourcesTracker appLocalRsrcsTracker =
-        appRsrc.remove(ConverterUtils.toString(application.getAppId()));
-      if (null == appLocalRsrcsTracker) {
-        LOG.warn("Removing uninitialized application " + application);
-      }
-      // TODO: What to do with appLocalRsrcsTracker?
-
-      // Delete the application directories
-      userName = application.getUser();
-      appIDStr = application.toString();
-      for (Path localDir : localDirs) {
-
-        // Delete the user-owned app-dir
-        Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
-        Path userdir = new Path(usersdir, userName);
-        Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
-        Path appDir = new Path(allAppsdir, appIDStr);
-        delService.delete(userName, appDir, new Path[] {});
-
-        // Delete the nmPrivate app-dir
-        Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
-        Path appSysDir = new Path(sysDir, appIDStr);
-        delService.delete(null, appSysDir, new Path[] {});
-      }
-
-      // TODO: decrement reference counts of all resources associated with this
-      // app
-
-      dispatcher.getEventHandler().handle(new ApplicationEvent(
-            application.getAppId(),
-            ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+      handleDestroyApplicationResources(
+          ((ApplicationLocalizationEvent)event).getApplication());
       break;
+    default:
+      throw new YarnException("Unknown localization event: " + event);
     }
   }
+  
+  /**
+   * Handle event received the first time any container is scheduled
+   * by a given application.
+   */
+  @SuppressWarnings("unchecked")
+  private void handleInitApplicationResources(Application app) {
+    // 0) Create application tracking structs
+    String userName = app.getUser();
+    privateRsrc.putIfAbsent(userName,
+        new LocalResourcesTrackerImpl(userName, dispatcher));
+    if (null != appRsrc.putIfAbsent(ConverterUtils.toString(app.getAppId()),
+        new LocalResourcesTrackerImpl(app.getUser(), dispatcher))) {
+      LOG.warn("Initializing application " + app + " already present");
+      assert false; // TODO: FIXME assert doesn't help
+                    // ^ The condition is benign. Tests should fail and it
+                    //   should appear in logs, but it's an internal error
+                    //   that should have no effect on applications
+    }
+    // 1) Signal container init
+    //
+    // This is handled by the ApplicationImpl state machine and allows
+    // containers to proceed with launching.
+    dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
+          app.getAppId()));
+  }
+  
+  /**
+   * For each of the requested resources for a container, determines the
+   * appropriate {@link LocalResourcesTracker} and forwards a 
+   * {@link LocalResourceRequest} to that tracker.
+   */
+  private void handleInitContainerResources(
+      ContainerLocalizationRequestEvent rsrcReqs) {
+    Container c = rsrcReqs.getContainer();
+    LocalizerContext ctxt = new LocalizerContext(
+        c.getUser(), c.getContainerID(), c.getCredentials());
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+      rsrcReqs.getRequestedResources();
+    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+         rsrcs.entrySet()) {
+      LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
+          c.getContainerID().getApplicationAttemptId().getApplicationId());
+      for (LocalResourceRequest req : e.getValue()) {
+        tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
+      }
+    }
+  }
+  
+  private void handleCacheCleanup(LocalizationEvent event) {
+    ResourceRetentionSet retain =
+      new ResourceRetentionSet(delService, cacheTargetSize);
+    retain.addResources(publicRsrc);
+    LOG.debug("Resource cleanup (public) " + retain);
+    for (LocalResourcesTracker t : privateRsrc.values()) {
+      retain.addResources(t);
+      LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
+    }
+    //TODO Check if appRsrcs should also be added to the retention set.
+  }
+
+
+  @SuppressWarnings("unchecked")
+  private void handleCleanupContainerResources(
+      ContainerLocalizationCleanupEvent rsrcCleanup) {
+    Container c = rsrcCleanup.getContainer();
+    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
+      rsrcCleanup.getResources();
+    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
+         rsrcs.entrySet()) {
+      LocalResourcesTracker tracker = getLocalResourcesTracker(e.getKey(), c.getUser(), 
+          c.getContainerID().getApplicationAttemptId().getApplicationId());
+      for (LocalResourceRequest req : e.getValue()) {
+        tracker.handle(new ResourceReleaseEvent(req, c.getContainerID()));
+      }
+    }
+
+    // Delete the container directories
+    String userName = c.getUser();
+    String containerIDStr = c.toString();
+    String appIDStr = ConverterUtils.toString(
+        c.getContainerID().getApplicationAttemptId().getApplicationId());
+    for (Path localDir : localDirs) {
+
+      // Delete the user-owned container-dir
+      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
+      Path userdir = new Path(usersdir, userName);
+      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+      Path appDir = new Path(allAppsdir, appIDStr);
+      Path containerDir = new Path(appDir, containerIDStr);
+      delService.delete(userName, containerDir, new Path[] {});
+
+      // Delete the nmPrivate container-dir
+      
+      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+      Path appSysDir = new Path(sysDir, appIDStr);
+      Path containerSysDir = new Path(appSysDir, containerIDStr);
+      delService.delete(null, containerSysDir,  new Path[] {});
+    }
+
+    dispatcher.getEventHandler().handle(new ContainerEvent(c.getContainerID(),
+          ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
+  }
+
+
+  @SuppressWarnings({"unchecked"})
+  private void handleDestroyApplicationResources(Application application) {
+    String userName;
+    String appIDStr;
+    LocalResourcesTracker appLocalRsrcsTracker =
+      appRsrc.remove(ConverterUtils.toString(application.getAppId()));
+    if (null == appLocalRsrcsTracker) {
+      LOG.warn("Removing uninitialized application " + application);
+    }
+    // TODO: What to do with appLocalRsrcsTracker?
+
+    // Delete the application directories
+    userName = application.getUser();
+    appIDStr = application.toString();
+    for (Path localDir : localDirs) {
+
+      // Delete the user-owned app-dir
+      Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
+      Path userdir = new Path(usersdir, userName);
+      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+      Path appDir = new Path(allAppsdir, appIDStr);
+      delService.delete(userName, appDir, new Path[] {});
+
+      // Delete the nmPrivate app-dir
+      Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+      Path appSysDir = new Path(sysDir, appIDStr);
+      delService.delete(null, appSysDir, new Path[] {});
+    }
+
+    // TODO: decrement reference counts of all resources associated with this
+    // app
+
+    dispatcher.getEventHandler().handle(new ApplicationEvent(
+          application.getAppId(),
+          ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+  }
+
 
   LocalResourcesTracker getLocalResourcesTracker(
       LocalResourceVisibility visibility, String user, ApplicationId appId) {
@@ -493,6 +561,17 @@ public class ResourceLocalizationService
 
   }
 
+  private static ExecutorService createLocalizerExecutor(Configuration conf) {
+    int nThreads = conf.getInt(
+        YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT,
+        YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT);
+    ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("PublicLocalizer #%d")
+      .build();
+    return Executors.newFixedThreadPool(nThreads, tf);
+  }
+
+
   class PublicLocalizer extends Thread {
 
     static final String PUBCACHE_CTXT = "public.cache.dirs";
@@ -508,16 +587,16 @@ public class ResourceLocalizationService
 
     PublicLocalizer(Configuration conf) {
       this(conf, getLocalFileContext(conf),
-           Executors.newFixedThreadPool(conf.getInt(
-               YarnConfiguration.NM_LOCALIZER_FETCH_THREAD_COUNT, YarnConfiguration.DEFAULT_NM_LOCALIZER_FETCH_THREAD_COUNT)),
+           createLocalizerExecutor(conf),
            new HashMap<Future<Path>,LocalizerResourceRequestEvent>(),
            new HashMap<LocalResourceRequest,List<LocalizerResourceRequestEvent>>());
     }
-
+    
     PublicLocalizer(Configuration conf, FileContext lfs,
         ExecutorService threadPool,
         Map<Future<Path>,LocalizerResourceRequestEvent> pending,
         Map<LocalResourceRequest,List<LocalizerResourceRequestEvent>> attempts) {
+      super("Public Localizer");
       this.lfs = lfs;
       this.conf = conf;
       this.pending = pending;
@@ -634,6 +713,7 @@ public class ResourceLocalizationService
       RecordFactoryProvider.getRecordFactory(getConfig());
 
     LocalizerRunner(LocalizerContext context, String localizerId) {
+      super("LocalizerRunner for " + localizerId);
       this.context = context;
       this.localizerId = localizerId;
       this.pending = new ArrayList<LocalizerResourceRequestEvent>();
@@ -762,36 +842,19 @@ public class ResourceLocalizationService
     @Override
     @SuppressWarnings("unchecked") // dispatcher not typed
     public void run() {
+      Path nmPrivateCTokensPath = null;
       try {
         // Use LocalDirAllocator to get nmPrivateDir
-        Path nmPrivateCTokensPath =
+        nmPrivateCTokensPath =
             localDirsSelector.getLocalPathForWrite(
                 NM_PRIVATE_DIR
                     + Path.SEPARATOR
                     + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
                         localizerId), getConfig());
+
         // 0) init queue, etc.
         // 1) write credentials to private dir
-        DataOutputStream tokenOut = null;
-        try {
-          Credentials credentials = context.getCredentials();
-          FileContext lfs = getLocalFileContext(getConfig());
-          tokenOut =
-              lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
-          LOG.info("Writing credentials to the nmPrivate file "
-              + nmPrivateCTokensPath.toString() + ". Credentials list: ");
-          if (LOG.isDebugEnabled()) {
-            for (Token<? extends TokenIdentifier> tk : credentials
-                .getAllTokens()) {
-              LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
-            }
-          }
-          credentials.writeTokenStorageToStream(tokenOut);
-        } finally {
-          if (tokenOut != null) {
-            tokenOut.close();
-          }
-        }
+        writeCredentials(nmPrivateCTokensPath);
         // 2) exec initApplication and wait
         exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
             context.getUser(),
@@ -811,6 +874,31 @@ public class ResourceLocalizationService
         for (LocalizerResourceRequestEvent event : scheduled.values()) {
           event.getResource().unlock();
         }
+        delService.delete(null, nmPrivateCTokensPath, new Path[] {});
+      }
+    }
+
+    private void writeCredentials(Path nmPrivateCTokensPath)
+        throws IOException {
+      DataOutputStream tokenOut = null;
+      try {
+        Credentials credentials = context.getCredentials();
+        FileContext lfs = getLocalFileContext(getConfig());
+        tokenOut =
+            lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));
+        LOG.info("Writing credentials to the nmPrivate file "
+            + nmPrivateCTokensPath.toString() + ". Credentials list: ");
+        if (LOG.isDebugEnabled()) {
+          for (Token<? extends TokenIdentifier> tk : credentials
+              .getAllTokens()) {
+            LOG.debug(tk.getService() + " : " + tk.encodeToUrlString());
+          }
+        }
+        credentials.writeTokenStorageToStream(tokenOut);
+      } finally {
+        if (tokenOut != null) {
+          tokenOut.close();
+        }
       }
     }
 
@@ -821,6 +909,7 @@ public class ResourceLocalizationService
     private final Dispatcher dispatcher;
 
     public CacheCleanup(Dispatcher dispatcher) {
+      super("CacheCleanup");
       this.dispatcher = dispatcher;
     }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java Wed Nov  2 05:34:31 2011
@@ -22,8 +22,15 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
+/**
+ * Event that requests that the {@link ResourceLocalizationService} localize
+ * a set of resources for the given container. This is generated by
+ * {@link ContainerImpl} during container initialization.
+ */
 public class ContainerLocalizationRequestEvent extends
     ContainerLocalizationEvent {
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizationEvent.java Wed Nov  2 05:34:31 2011
@@ -19,7 +19,11 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
+/**
+ * Events handled by {@link ResourceLocalizationService}
+ */
 public class LocalizationEvent extends AbstractEvent<LocalizationEventType> {
 
   public LocalizationEvent(LocalizationEventType event) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEvent.java Wed Nov  2 05:34:31 2011
@@ -18,7 +18,11 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 
+/**
+ * Events delivered to the {@link ResourceLocalizationService}
+ */
 public class LocalizerEvent extends AbstractEvent<LocalizerEventType> {
 
   private final String localizerId;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerEventType.java Wed Nov  2 05:34:31 2011
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
 public enum LocalizerEventType {
+  /** See {@link LocalizerResourceRequestEvent} */
   REQUEST_RESOURCE_LOCALIZATION,
   ABORT_LOCALIZATION
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/LocalizerResourceRequestEvent.java Wed Nov  2 05:34:31 2011
@@ -20,8 +20,13 @@ package org.apache.hadoop.yarn.server.no
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizerContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+/**
+ * Event indicating that the {@link ResourceLocalizationService}
+ * should fetch this resource.
+ */
 public class LocalizerResourceRequestEvent extends LocalizerEvent {
 
   private final LocalizerContext context;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEventType.java Wed Nov  2 05:34:31 2011
@@ -17,8 +17,17 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event;
 
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
+
+/**
+ * Events delivered to {@link LocalizedResource}. Each of these
+ * events is a subclass of {@link ResourceEvent}.
+ */
 public enum ResourceEventType {
+  /** See {@link ResourceRequestEvent} */
   REQUEST,
+  /** See {@link ResourceLocalizedEvent} */ 
   LOCALIZED,
+  /** See {@link ResourceReleaseEvent} */
   RELEASE
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSelector.java Wed Nov  2 05:34:31 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.no
 
 import java.util.Collection;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -28,23 +30,23 @@ import org.apache.hadoop.security.token.
 public class LocalizerTokenSelector implements
     TokenSelector<LocalizerTokenIdentifier> {
 
+  private static final Log LOG = LogFactory
+      .getLog(LocalizerTokenSelector.class);
+
+  @SuppressWarnings("unchecked")
   @Override
   public Token<LocalizerTokenIdentifier> selectToken(Text service,
       Collection<Token<? extends TokenIdentifier>> tokens) {
-    System.err.print("=========== Using localizerTokenSelector");
-//    if (service == null) {
-//      return null;
-//    }
+
+    LOG.debug("Using localizerTokenSelector.");
+
     for (Token<? extends TokenIdentifier> token : tokens) {
-      System.err.print("============ token of kind " + token.getKind() + " is found");
-      if (LocalizerTokenIdentifier.KIND.equals(token.getKind())
-          //&& service.equals(token.getService())
-          ) {
+      LOG.debug("Token of kind " + token.getKind() + " is found");
+      if (LocalizerTokenIdentifier.KIND.equals(token.getKind())) {
         return (Token<LocalizerTokenIdentifier>) token;
       }
     }
-    System.err.print("returning null ========== ");
+    LOG.debug("Returning null.");
     return null;
   }
-
 }