You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/05/10 17:18:59 UTC

svn commit: r1101502 [1/2] - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/ yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ yarn/ya...

Author: vinodkv
Date: Tue May 10 15:18:58 2011
New Revision: 1101502

URL: http://svn.apache.org/viewvc?rev=1101502&view=rev
Log:
Fix NodeManager to use multiple disks for the local files and the logs. Contributed by Vinod Kumar Vavilapalli.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ContainerLocalizationRequestEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/event/ResourceEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDeletionService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue May 10 15:18:58 2011
@@ -3,6 +3,8 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    Fix NM to use multiple disks for local files and the userlogs. (vinodkv)
+
     Improved TestJobHistoryEvents and TestJobHistoryParsing. (sharad)
 
     MAPREDUCE-2478. Improve history server. (Siddharth Seth via sharad)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.c Tue May 10 15:18:58 2011
@@ -261,24 +261,11 @@ char *get_task_credentials_file(const ch
 }
 
 /**
- * Get the job log directory.
- * Ensures that the result is a realpath and that it is underneath the 
- * tt log root.
- */
-char* get_job_log_directory(const char* jobid) {
-  char* log_dir = get_value(TT_LOG_DIR_KEY);
-  if (log_dir == NULL) {
-    fprintf(LOGFILE, "Log directory %s is not configured.\n", TT_LOG_DIR_KEY);
-    return NULL;
-  }
-  char *result = concatenate("%s/%s", "job log dir", 2, log_dir,
+ * Get the job log directory under the given log_root
+ */
+char* get_job_log_directory(const char *log_root, const char* jobid) {
+  return concatenate("%s/%s", "job log dir", 2, log_root,
                              jobid);
-  if (result == NULL) {
-    fprintf(LOGFILE, "failed to get memory in get_job_log_directory for %s"
-            " and %s\n", log_dir, jobid);
-  }
-  free(log_dir);
-  return result;
 }
 
 /*
@@ -345,7 +332,8 @@ static int create_attempt_directories(co
             "Either task_id is null or the user passed is null.\n");
     return -1;
   }
-  int result = 0;
+
+  int result = -1;
 
   char **local_dir = get_values(TT_SYS_DIR_KEY);
 
@@ -366,11 +354,16 @@ static int create_attempt_directories(co
       // continue on to create other task directories
       free(task_dir);
     } else {
+      result = 0;
       free(task_dir);
     }
   }
   free_values(local_dir);
+  if (result != 0) {
+    return result;
+  }
 
+  result = -1;
   // also make the directory for the task logs
   char *job_task_name = malloc(strlen(job_id) + strlen(task_id) + 2);
   if (job_task_name == NULL) {
@@ -378,14 +371,28 @@ static int create_attempt_directories(co
     result = -1;
   } else {
     sprintf(job_task_name, "%s/%s", job_id, task_id);
-    char *log_dir = get_job_log_directory(job_task_name);
-    free(job_task_name);
+
+    char **log_dir = get_values(TT_LOG_DIR_KEY);
     if (log_dir == NULL) {
-      result = -1;
-    } else if (mkdirs(log_dir, perms) != 0) {
-      result = -1;
+      fprintf(LOGFILE, "%s is not configured.\n", TT_LOG_DIR_KEY);
+      return -1;
+    }
+
+    char **log_dir_ptr;
+    for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) {
+      char *job_log_dir = get_job_log_directory(*log_dir_ptr, job_task_name);
+      if (job_log_dir == NULL) {
+        free_values(log_dir);
+        return -1;
+      } else if (mkdirs(job_log_dir, perms) != 0) {
+    	free(job_log_dir);
+      } else {
+    	result = 0;
+    	free(job_log_dir);
+      }
     }
-    free(log_dir);
+    free(job_task_name);
+    free_values(log_dir);
   }
   return result;
 }
@@ -654,22 +661,39 @@ int initialize_job(const char *user, con
     return INVALID_ARGUMENT_NUMBER;
   }
 
-  // create the user directory
+  // create the user directory on all disks
   int result = initialize_user(user);
   if (result != 0) {
     return result;
   }
 
-  // create the log directory for the job
-  char *job_log_dir = get_job_log_directory(jobid);
-  if (job_log_dir == NULL) {
-    return -1;
+  ////////////// create the log directories for the app on all disks
+  char **log_roots = get_values(TT_LOG_DIR_KEY);
+  if (log_roots == NULL) {
+    return INVALID_CONFIG_FILE;
   }
-  result = create_directory_for_user(job_log_dir);
-  free(job_log_dir);
-  if (result != 0) {
+  char **log_root;
+  char *any_one_job_log_dir = NULL;
+  for(log_root=log_roots; *log_root != NULL; ++log_root) {
+    char *job_log_dir = get_job_log_directory(*log_root, jobid);
+    if (job_log_dir == NULL) {
+      // try the next one
+    } else if (create_directory_for_user(job_log_dir) != 0) {
+      free(job_log_dir);
+      return -1;
+    } else if (any_one_job_log_dir == NULL) {
+    	any_one_job_log_dir = job_log_dir;
+    } else {
+      free(job_log_dir);
+    }
+  }
+  free_values(log_roots);
+  if (any_one_job_log_dir == NULL) {
+    fprintf(LOGFILE, "Did not create any job-log directories\n");
     return -1;
   }
+  free(any_one_job_log_dir);
+  ////////////// End of creating the log directories for the app on all disks
 
   // open up the credentials file
   int cred_file = open_file_as_task_tracker(nmPrivate_credentials_file);
@@ -711,6 +735,8 @@ int initialize_job(const char *user, con
   }
 
   char *nmPrivate_credentials_file_copy = strdup(nmPrivate_credentials_file);
+  // TODO: FIXME. The user's copy of creds should go to a path selected by
+  // localDirAllocatoir
   char *cred_file_name = concatenate("%s/%s", "cred file", 2,
 				   primary_job_dir, basename(nmPrivate_credentials_file_copy));
   if (cred_file_name == NULL) {
@@ -1014,6 +1040,7 @@ int delete_as_user(const char *user,
 
   char** ptr;
 
+  // TODO: No switching user? !!!!
   if (baseDirs == NULL || *baseDirs == NULL) {
     return delete_path(subdir, strlen(subdir) == 0);
   }
@@ -1038,7 +1065,8 @@ int delete_as_user(const char *user,
  * delete a given log directory
  */
 int delete_log_directory(const char *subdir) {
-  char* log_subdir = get_job_log_directory(subdir);
+  // TODO: FIXME. This isn't used at all!!!!!
+  char* log_subdir = get_job_log_directory(subdir, subdir); // TODO: FIX
   int ret = -1;
   if (log_subdir != NULL) {
     ret = delete_path(log_subdir, strchr(subdir, '/') == NULL);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/c/container-executor/impl/container-executor.h Tue May 10 15:18:58 2011
@@ -134,11 +134,9 @@ char *get_task_launcher_file(const char*
 char *get_task_credentials_file(const char* work_dir);
 
 /**
- * Get the job log directory.
- * Ensures that the result is a realpath and that it is underneath the 
- * tt log root.
+ * Get the job log directory under log_root
  */
-char* get_job_log_directory(const char* jobid);
+char* get_job_log_directory(const char* log_root, const char* jobid);
 
 char *get_task_log_dir(const char *log_dir, const char *job_id, 
                        const char *attempt_id);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Tue May 10 15:18:58 2011
@@ -67,14 +67,14 @@ public abstract class ContainerExecutor 
    *   Copy $rsrc -> $N/$user/$appId/filecache/idef
    * @param user user name of application owner
    * @param appId id of the application
-   * @param nmLocal path to localized credentials, rsrc by NM
+   * @param nmPrivateContainerTokens path to localized credentials, rsrc by NM
    * @param nmAddr RPC address to contact NM
    * @throws IOException For most application init failures
    * @throws InterruptedException If application init thread is halted by NM
    */
-  public abstract void startLocalizer(Path nmLocal,
+  public abstract void startLocalizer(Path nmPrivateContainerTokens,
       InetSocketAddress nmAddr, String user, String appId, String locId,
-      Path logDir, List<Path> localDirs)
+      List<Path> localDirs)
     throws IOException, InterruptedException;
 
   /**
@@ -83,9 +83,9 @@ public abstract class ContainerExecutor 
    * 
    * @param launchCtxt
    */
-  public abstract int launchContainer(Container container, Path nmLocal,
-      String user, String appId, Path appLogDir, List<Path> appDirs)
-      throws IOException;
+  public abstract int launchContainer(Container container,
+      Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
+      String user, String appId, Path containerWorkDir) throws IOException;
 
   public abstract boolean signalContainer(String user, String pid,
       Signal signal)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Tue May 10 15:18:58 2011
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.UnsupportedF
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
@@ -56,24 +57,25 @@ public class DefaultContainerExecutor ex
   }
 
   @Override
-  public void startLocalizer(Path nmLocal, InetSocketAddress nmAddr,
-      String user, String appId, String locId, Path logDir,
+  public void startLocalizer(Path nmPrivateContainerTokensPath,
+      InetSocketAddress nmAddr, String user, String appId, String locId,
       List<Path> localDirs) throws IOException, InterruptedException {
 
     ContainerLocalizer localizer =
-        new ContainerLocalizer(user, appId, locId, logDir, localDirs);
+        new ContainerLocalizer(this.lfs, user, appId, locId,
+            localDirs, RecordFactoryProvider.getRecordFactory(getConf()));
 
     createUserLocalDirs(localDirs, user);
     createUserCacheDirs(localDirs, user);
     createAppDirs(localDirs, user, appId);
-    createAppLogDir(logDir, appId);
+    createAppLogDirs(appId);
 
-    Path appStorageDir = getApplicationDir(localDirs, user, appId);
+    // TODO: Why pick first app dir. The same in LCE why not random?
+    Path appStorageDir = getFirstApplicationDir(localDirs, user, appId);
 
     String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_FMT, locId);
-    Path appTokens = new Path(nmLocal, tokenFn);
     Path tokenDst = new Path(appStorageDir, tokenFn);
-    lfs.util().copy(appTokens, tokenDst);
+    lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
     lfs.setWorkingDirectory(appStorageDir);
 
     // TODO: DO it over RPC for maintaining similarity?
@@ -81,29 +83,39 @@ public class DefaultContainerExecutor ex
   }
 
   @Override
-  public int launchContainer(Container container, Path nmLocal, String user,
-      String appId, Path appLogDir, List<Path> appDirs) throws IOException {
-    // create container dirs
+  public int launchContainer(Container container,
+      Path nmPrivateContainerScriptPath, Path nmPrivateTokensPath,
+      String userName, String appId, Path containerWorkDir)
+      throws IOException {
+
+    // create container dirs on all disks
     String containerIdStr = ConverterUtils.toString(container.getContainerID());
-    for (Path p : appDirs) {
-      lfs.mkdir(new Path(p, containerIdStr), null, false);
+    String appIdStr =
+        ConverterUtils.toString(container.getContainerID().getAppId());
+    String[] sLocalDirs =
+        getConf().getStrings(NMConfig.NM_LOCAL_DIR, NMConfig.DEFAULT_NM_LOCAL_DIR);
+    for (String sLocalDir : sLocalDirs) {
+      Path usersdir = new Path(sLocalDir, ContainerLocalizer.USERCACHE);
+      Path userdir = new Path(usersdir, userName);
+      Path appCacheDir = new Path(userdir, ContainerLocalizer.APPCACHE);
+      Path appDir = new Path(appCacheDir, appIdStr);
+      Path containerDir = new Path(appDir, containerIdStr);
+      lfs.mkdir(containerDir, null, false);
     }
-    lfs.mkdir(new Path(appLogDir, containerIdStr), null, false);
+
+    // Create the container log-dirs on all disks
+    createContainerLogDirs(appIdStr, containerIdStr);
+
     // copy launch script to work dir
-    // TODO: ROUND_ROBIN Below
-    Path appWorkDir = new Path(appDirs.get(0), containerIdStr);
-    Path launchScript = new Path(nmLocal, ContainerLaunch.CONTAINER_SCRIPT);
-    Path launchDst = new Path(appWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
-    lfs.util().copy(launchScript, launchDst);
-    // copy container tokens to work dir
-    Path appTokens = new Path(nmLocal, String.format(
-        ContainerLocalizer.TOKEN_FILE_FMT,
-        containerIdStr));
+    Path launchDst =
+        new Path(containerWorkDir, ContainerLaunch.CONTAINER_SCRIPT);
+    lfs.util().copy(nmPrivateContainerScriptPath, launchDst);
 
+    // copy container tokens to work dir
     Path tokenDst =
-      new Path(appWorkDir, ContainerLaunch.CONTAINER_TOKENS);
+      new Path(containerWorkDir, ContainerLaunch.CONTAINER_TOKENS);
+    lfs.util().copy(nmPrivateTokensPath, tokenDst);
 
-    lfs.util().copy(appTokens, tokenDst);
     // create log dir under app
     // fork script
     ShellCommandExecutor shExec = null;
@@ -113,7 +125,7 @@ public class DefaultContainerExecutor ex
       String[] command = 
           new String[] { "bash", "-c", launchDst.toUri().getPath().toString() };
       shExec = new ShellCommandExecutor(command,
-          new File(appWorkDir.toUri().getPath()));
+          new File(containerWorkDir.toUri().getPath()));
       launchCommandObjs.put(container.getLaunchContext().getContainerId(), shExec);
       shExec.execute();
     } catch (Exception e) {
@@ -199,7 +211,7 @@ public class DefaultContainerExecutor ex
    * $logdir/$user/$appId */
   private static final short LOGDIR_PERM = (short)0710;
 
-  private Path getApplicationDir(List<Path> localDirs, String user,
+  private Path getFirstApplicationDir(List<Path> localDirs, String user,
       String appId) {
     return getApplicationDir(localDirs.get(0), user, appId);
   }
@@ -233,7 +245,7 @@ public class DefaultContainerExecutor ex
     boolean userDirStatus = false;
     FsPermission userperms = new FsPermission(USER_PERM);
     for (Path localDir : localDirs) {
-      // create $local.dir/usercache/$user
+      // create $local.dir/usercache/$user and its immediate parent
       try {
         lfs.mkdir(getUserCacheDir(localDir, user), userperms, true);
       } catch (IOException e) {
@@ -328,26 +340,65 @@ public class DefaultContainerExecutor ex
           + "in any of the configured local directories for app "
           + appId.toString());
     }
-    // pick random work dir for compatibility
-    // create $local.dir/usercache/$user/appcache/$appId/work
-    Path workDir =
-        new Path(getApplicationDir(localDirs, user, appId),
-            ContainerLocalizer.WORKDIR);
-    lfs.mkdir(workDir, null, true);
   }
 
   /**
-   * Create application log directory.
+   * Create application log directories on all disks.
    */
-  private void createAppLogDir(Path logDir, String appId)
+  private void createAppLogDirs(String appId)
       throws IOException {
-    Path appUserLogDir = new Path(logDir, appId);
-    try {
-      lfs.mkdir(appUserLogDir, new FsPermission(LOGDIR_PERM), true);
-    } catch (IOException e) {
-      throw new IOException(
-          "Could not create app user log directory: " + appUserLogDir, e);
+    String[] rootLogDirs =
+        getConf()
+            .getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
+    
+    boolean appLogDirStatus = false;
+    FsPermission appLogDirPerms = new FsPermission(LOGDIR_PERM);
+    for (String rootLogDir : rootLogDirs) {
+      // create $log.dir/$appid
+      Path appLogDir = new Path(rootLogDir, appId);
+      try {
+        lfs.mkdir(appLogDir, appLogDirPerms, true);
+      } catch (IOException e) {
+        LOG.warn("Unable to create the app-log directory : " + appLogDir, e);
+        continue;
+      }
+      appLogDirStatus = true;
+    }
+    if (!appLogDirStatus) {
+      throw new IOException("Not able to initialize app-log directories "
+          + "in any of the configured local directories for app " + appId);
     }
   }
 
+  /**
+   * Create application log directories on all disks.
+   */
+  private void createContainerLogDirs(String appId, String containerId)
+      throws IOException {
+    String[] rootLogDirs =
+        getConf()
+            .getStrings(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR);
+    
+    boolean containerLogDirStatus = false;
+    FsPermission containerLogDirPerms = new FsPermission(LOGDIR_PERM);
+    for (String rootLogDir : rootLogDirs) {
+      // create $log.dir/$appid/$containerid
+      Path appLogDir = new Path(rootLogDir, appId);
+      Path containerLogDir = new Path(appLogDir, containerId);
+      try {
+        lfs.mkdir(containerLogDir, containerLogDirPerms, true);
+      } catch (IOException e) {
+        LOG.warn("Unable to create the container-log directory : "
+            + appLogDir, e);
+        continue;
+      }
+      containerLogDirStatus = true;
+    }
+    if (!containerLogDirStatus) {
+      throw new IOException(
+          "Not able to initialize container-log directories "
+              + "in any of the configured local directories for container "
+              + containerId);
+    }
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Tue May 10 15:18:58 2011
@@ -100,17 +100,15 @@ public class LinuxContainerExecutor exte
   }
 
   @Override
-  public void startLocalizer(Path nmLocal, InetSocketAddress nmAddr,
-      String user, String appId, String locId, Path logDir,
+  public void startLocalizer(Path nmPrivateContainerTokensPath,
+      InetSocketAddress nmAddr, String user, String appId, String locId,
       List<Path> localDirs) throws IOException, InterruptedException {
-    Path appTokens = new Path(nmLocal, String.format(
-          ContainerLocalizer.TOKEN_FILE_FMT, locId));
     List<String> command = new ArrayList<String>(
       Arrays.asList(containerExecutorExe, 
                     user, 
                     Integer.toString(Commands.INITIALIZE_JOB.getValue()),
                     appId,
-                    appTokens.toUri().getPath().toString()));
+                    nmPrivateContainerTokensPath.toUri().getPath().toString()));
     File jvm =                                  // use same jvm as parent
       new File(new File(System.getProperty("java.home"), "bin"), "java");
     command.add(jvm.toString());
@@ -122,7 +120,6 @@ public class LinuxContainerExecutor exte
     command.add(locId);
     command.add(nmAddr.getHostName());
     command.add(Integer.toString(nmAddr.getPort()));
-    command.add(logDir.toUri().getPath().toString());
     for (Path p : localDirs) {
       command.add(p.toUri().getPath().toString());
     }
@@ -147,22 +144,19 @@ public class LinuxContainerExecutor exte
   }
 
   @Override
-  public int launchContainer(Container container, Path nmLocal, String user,
-      String appId, Path appLogDir, List<Path> appDirs) throws IOException {
-    Path appWorkDir = new Path(appDirs.get(0), container.toString()); // TODO: Use ROUND_ROBIN
-    Path launchScript = new Path(nmLocal, ContainerLaunch.CONTAINER_SCRIPT);
-    Path nmPrivateAppTokenFile = new Path(nmLocal, String.format(
-          ContainerLocalizer.TOKEN_FILE_FMT,
-          ConverterUtils.toString(container.getContainerID())));
+  public int launchContainer(Container container,
+      Path nmPrivateCotainerScriptPath, Path nmPrivateTokensPath,
+      String user, String appId, Path containerWorkDir) throws IOException {
+
     List<String> command = new ArrayList<String>(
       Arrays.asList(containerExecutorExe, 
                     user, 
                     Integer.toString(Commands.LAUNCH_CONTAINER.getValue()),
                     appId,
                     container.toString(),
-                    appWorkDir.toString(),
-                    launchScript.toUri().getPath().toString(),
-                    nmPrivateAppTokenFile.toUri().getPath().toString()));
+                    containerWorkDir.toString(),
+                    nmPrivateCotainerScriptPath.toUri().getPath().toString(),
+                    nmPrivateTokensPath.toUri().getPath().toString()));
     String[] commandArray = command.toArray(new String[command.size()]);
     ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
     launchCommandObjs.put(container.getLaunchContext().getContainerId(), shExec);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue May 10 15:18:58 2011
@@ -51,10 +51,38 @@ public class NodeManager extends Composi
 
   public NodeManager() {
     super(NodeManager.class.getName());
+  }
+
+  protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+      Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker);
+  }
+
+  protected NodeResourceMonitor createNodeResourceMonitor() {
+    return new NodeResourceMonitorImpl();
+  }
+
+  protected ContainerManagerImpl createContainerManager(Context context,
+      ContainerExecutor exec, DeletionService del,
+      NodeStatusUpdater nodeStatusUpdater) {
+    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater);
+  }
+
+  protected WebServer createWebServer(Context nmContext,
+      ResourceView resourceView) {
+    return new WebServer(nmContext, resourceView);
+  }
+
+  protected void doSecureLogin() throws IOException {
+    SecurityUtil.login(getConfig(), NM_KEYTAB,
+        YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
+  }
+
+  @Override
+  public void init(Configuration conf) {
 
     Context context = new NMContext();
 
-    YarnConfiguration conf = new YarnConfiguration();
     ContainerExecutor exec = ReflectionUtils.newInstance(
         conf.getClass(NM_CONTAINER_EXECUTOR_CLASS,
           DefaultContainerExecutor.class, ContainerExecutor.class), conf);
@@ -90,35 +118,7 @@ public class NodeManager extends Composi
 
     dispatcher.register(ContainerManagerEventType.class, containerManager);
     addService(dispatcher);
-  }
-
-  protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-      Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
-    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker);
-  }
 
-  protected NodeResourceMonitor createNodeResourceMonitor() {
-    return new NodeResourceMonitorImpl();
-  }
-
-  protected ContainerManagerImpl createContainerManager(Context context,
-      ContainerExecutor exec, DeletionService del,
-      NodeStatusUpdater nodeStatusUpdater) {
-    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater);
-  }
-
-  protected WebServer createWebServer(Context nmContext,
-      ResourceView resourceView) {
-    return new WebServer(nmContext, resourceView);
-  }
-
-  protected void doSecureLogin() throws IOException {
-    SecurityUtil.login(getConfig(), NM_KEYTAB,
-        YarnServerConfig.NM_SERVER_PRINCIPAL_KEY);
-  }
-
-  @Override
-  public void init(Configuration conf) {
     Runtime.getRuntime().addShutdownHook(new Thread() {
           @Override
           public void run() {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/LocalizerStatus.java Tue May 10 15:18:58 2011
@@ -20,13 +20,14 @@ package org.apache.hadoop.yarn.server.no
 import java.util.List;
 
 public interface LocalizerStatus {
-  public String getLocalizerId();
-  public List<LocalResourceStatus> getResources();
 
-  public void setLocalizerId(String id);
-  public void addAllResources(List<LocalResourceStatus> resources);
-  public void addResourceStatus(LocalResourceStatus resource);
-  public LocalResourceStatus getResourceStatus(int index);
-  public void removeResource(int index);
-  public void clearResources();
+  String getLocalizerId();
+  void setLocalizerId(String id);
+
+  List<LocalResourceStatus> getResources();
+  void addAllResources(List<LocalResourceStatus> resources);
+  void addResourceStatus(LocalResourceStatus resource);
+  LocalResourceStatus getResourceStatus(int index);
+  void removeResource(int index);
+  void clearResources();
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Tue May 10 15:18:58 2011
@@ -22,6 +22,9 @@ import static org.apache.hadoop.fs.Creat
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
 
 import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
@@ -30,21 +33,27 @@ import java.util.concurrent.Callable;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 public class ContainerLaunch implements Callable<Integer> {
@@ -58,20 +67,17 @@ public class ContainerLaunch implements 
   private final ContainerExecutor exec;
   private final Application app;
   private final Container container;
-  private final Path sysDir;
-  private final Path appLogDir;
-  private final List<Path> appDirs;
-
-  public ContainerLaunch(Dispatcher dispatcher, ContainerExecutor exec,
-      Application app, Container container, Path sysDir, Path appLogDir,
-      List<Path> appDirs) {
+  private final Configuration conf;
+  private final LocalDirAllocator logDirsSelector;
+
+  public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
+      ContainerExecutor exec, Application app, Container container) {
+    this.conf = configuration;
     this.app = app;
     this.exec = exec;
-    this.sysDir = sysDir;
-    this.appLogDir = appLogDir;
-    this.appDirs = appDirs;
     this.container = container;
     this.dispatcher = dispatcher;
+    this.logDirsSelector = new LocalDirAllocator(NMConfig.NM_LOG_DIR);
   }
 
   @Override
@@ -84,51 +90,90 @@ public class ContainerLaunch implements 
     final List<String> command = launchContext.getCommandList();
     int ret = -1;
 
-    // Variable expansion
-    // Before the container script gets written out.
-    List<String> cmds = container.getLaunchContext().getCommandList();
-    List<String> newCmds = new ArrayList<String>(cmds.size());
-    String containerIdStr = ConverterUtils.toString(container.getContainerID());
-    Path containerLogDir = new Path(appLogDir, containerIdStr);
-    for (String str : cmds) {
-      newCmds.add(str.replace("<LOG_DIR>", containerLogDir.toUri()
-          .getPath()));
-    }
-    container.getLaunchContext().clearCommands();
-    container.getLaunchContext().addAllCommands(newCmds);
-
     try {
+      // /////////////////////////// Variable expansion
+      // Before the container script gets written out.
+      List<String> cmds = container.getLaunchContext().getCommandList();
+      List<String> newCmds = new ArrayList<String>(cmds.size());
+      String containerIdStr = ConverterUtils.toString(container.getContainerID());
+      String appIdStr = app.toString();
+      Path containerLogDir =
+          this.logDirsSelector.getLocalPathForWrite(appIdStr + Path.SEPARATOR
+              + containerIdStr, LocalDirAllocator.SIZE_UNKNOWN, this.conf,
+              false);
+      for (String str : cmds) {
+        newCmds.add(str.replace("<LOG_DIR>", containerLogDir.toUri()
+            .getPath()));
+      }
+      container.getLaunchContext().clearCommands();
+      container.getLaunchContext().addAllCommands(newCmds);
+      // /////////////////////////// End of variable expansion
+
       FileContext lfs = FileContext.getLocalFSFileContext();
-      Path launchSysDir = new Path(sysDir, containerIdStr);
-      lfs.mkdir(launchSysDir, null, true);
-      Path launchPath = new Path(launchSysDir, CONTAINER_SCRIPT);
-      Path tokensPath =
-          new Path(launchSysDir, String.format(
-              ContainerLocalizer.TOKEN_FILE_FMT, containerIdStr));
-      DataOutputStream launchOut = null;
-      DataOutputStream tokensOut = null;
-      
+      LocalDirAllocator lDirAllocator =
+          new LocalDirAllocator(NMConfig.NM_LOCAL_DIR); // TODO
+      Path nmPrivateContainerScriptPath =
+          lDirAllocator.getLocalPathForWrite(
+              ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+                  + appIdStr + Path.SEPARATOR + containerIdStr
+                  + Path.SEPARATOR + CONTAINER_SCRIPT, this.conf);
+      Path nmPrivateTokensPath =
+          lDirAllocator.getLocalPathForWrite(
+              ResourceLocalizationService.NM_PRIVATE_DIR
+                  + Path.SEPARATOR
+                  + containerIdStr
+                  + Path.SEPARATOR
+                  + String.format(ContainerLocalizer.TOKEN_FILE_FMT,
+                      containerIdStr), this.conf);
+      DataOutputStream containerScriptOutStream = null;
+      DataOutputStream tokensOutStream = null;
+
       try {
-        launchOut = lfs.create(launchPath, EnumSet.of(CREATE, OVERWRITE));
-        ContainerLocalizer.writeLaunchEnv(launchOut, env, localResources,
+        // /////////// Write out the container-script in the nmPrivate space.
+        String[] localDirs =
+            this.conf.getStrings(NMConfig.NM_LOCAL_DIR,
+                NMConfig.DEFAULT_NM_LOCAL_DIR);
+        List<Path> appDirs = new ArrayList<Path>(localDirs.length);
+        for (String localDir : localDirs) {
+          Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
+          Path userdir = new Path(usersdir, user);
+          Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+          appDirs.add(new Path(appsdir, appIdStr));
+        }
+        containerScriptOutStream =
+          lfs.create(nmPrivateContainerScriptPath,
+              EnumSet.of(CREATE, OVERWRITE));
+        writeLaunchEnv(containerScriptOutStream, env, localResources,
             command, appDirs);
-        
-        tokensOut = lfs.create(tokensPath, EnumSet.of(CREATE, OVERWRITE));
+        // /////////// End of writing out container-script
+
+        // /////////// Write out the container-tokens in the nmPrivate space.
+        tokensOutStream =
+            lfs.create(nmPrivateTokensPath, EnumSet.of(CREATE, OVERWRITE));
         Credentials creds = container.getCredentials();
-        creds.writeTokenStorageToStream(tokensOut);
+        creds.writeTokenStorageToStream(tokensOutStream);
+        // /////////// End of writing out container-tokens
       } finally {
-        IOUtils.cleanup(LOG, launchOut, tokensOut);
-        if (launchOut != null) {
-          launchOut.close();
-        }
+        IOUtils.cleanup(LOG, containerScriptOutStream, tokensOutStream);
       }
+
+      // Select the working directory for the container
+      Path containerWorkDir =
+          lDirAllocator.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+              + Path.SEPARATOR + user + Path.SEPARATOR
+              + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
+              + Path.SEPARATOR + containerIdStr,
+              LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
+
+      // LaunchContainer is a blocking call. We are here almost means the
+      // container is launched, so send out the event.
       dispatcher.getEventHandler().handle(new ContainerEvent(
             container.getContainerID(),
             ContainerEventType.CONTAINER_LAUNCHED));
 
       ret =
-        exec.launchContainer(container, launchSysDir, user, app.toString(),
-            appLogDir, appDirs);
+          exec.launchContainer(container, nmPrivateContainerScriptPath,
+              nmPrivateTokensPath, user, appIdStr, containerWorkDir);
       if (ret == ExitCode.KILLED.getExitCode()) {
         // If the process was killed, Send container_cleanedup_after_kill and
         // just break out of this method.
@@ -154,4 +199,99 @@ public class ContainerLaunch implements 
     return 0;
   }
 
+  private static class ShellScriptBuilder {
+    
+    private final StringBuilder sb;
+  
+    public ShellScriptBuilder() {
+      this(new StringBuilder("#!/bin/bash\n\n"));
+    }
+  
+    protected ShellScriptBuilder(StringBuilder sb) {
+      this.sb = sb;
+    }
+  
+    public ShellScriptBuilder env(String key, String value) {
+      line("export ", key, "=\"", value, "\"");
+      return this;
+    }
+  
+    public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
+      return symlink(src, new Path(dst));
+    }
+  
+    public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
+      if (!src.isAbsolute()) {
+        throw new IOException("Source must be absolute");
+      }
+      if (dst.isAbsolute()) {
+        throw new IOException("Destination must be relative");
+      }
+      if (dst.toUri().getPath().indexOf('/') != -1) {
+        line("mkdir -p ", dst.getParent().toString());
+      }
+      line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
+      return this;
+    }
+  
+    public void write(PrintStream out) throws IOException {
+      out.append(sb);
+    }
+  
+    public void line(String... command) {
+      for (String s : command) {
+        sb.append(s);
+      }
+      sb.append("\n");
+    }
+  
+    @Override
+    public String toString() {
+      return sb.toString();
+    }
+  
+  }
+
+  private static void writeLaunchEnv(OutputStream out,
+      Map<String,String> environment, Map<Path,String> resources,
+      List<String> command, List<Path> appDirs)
+      throws IOException {
+    ShellScriptBuilder sb = new ShellScriptBuilder();
+    if (System.getenv("YARN_HOME") != null) {
+      sb.env("YARN_HOME", System.getenv("YARN_HOME"));
+    }
+    sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
+        StringUtils.join(",", appDirs));
+    if (environment != null) {
+      for (Map.Entry<String,String> env : environment.entrySet()) {
+        sb.env(env.getKey().toString(), env.getValue().toString());
+      }
+    }
+    if (resources != null) {
+      for (Map.Entry<Path,String> link : resources.entrySet()) {
+        sb.symlink(link.getKey(), link.getValue());
+      }
+    }
+    ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
+    cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
+    cmd.add("/bin/bash ");
+    cmd.add("-c ");
+    cmd.add("\"");
+    for (String cs : command) {
+      cmd.add(cs.toString());
+      cmd.add(" ");
+    }
+    cmd.add("\"");
+    sb.line(cmd.toArray(new String[cmd.size()]));
+    PrintStream pout = null;
+    try {
+      pout = new PrintStream(out);
+      sb.write(pout);
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Tue May 10 15:18:58 2011
@@ -18,16 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
 
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_LOG_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOCAL_DIR;
-import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_LOG_DIR;
-
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -35,7 +28,7 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -45,7 +38,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -64,9 +56,6 @@ public class ContainersLauncher extends 
   private final Dispatcher dispatcher;
   private final ExecutorService containerLauncher =
     Executors.newCachedThreadPool();
-  private List<Path> logDirs;
-  private List<Path> localDirs;
-  private List<Path> sysDirs;
   private final Map<ContainerId,RunningContainer> running =
     Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());
 
@@ -91,32 +80,11 @@ public class ContainersLauncher extends 
 
   @Override
   public void init(Configuration conf) {
-    // TODO factor this out of Localizer
     try {
       FileContext lfs = FileContext.getLocalFSFileContext(conf);
-      String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
-
-      localDirs = new ArrayList<Path>(sLocalDirs.length);
-      logDirs = new ArrayList<Path>(sLocalDirs.length);
-      sysDirs = new ArrayList<Path>(sLocalDirs.length);
-      for (String sLocaldir : sLocalDirs) {
-        Path localdir = new Path(sLocaldir);
-        localDirs.add(localdir);
-        // $local/nmPrivate
-        Path sysdir = new Path(localdir, ResourceLocalizationService.NM_PRIVATE_DIR);
-        sysDirs.add(sysdir);
-      }
-      String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
-      for (String sLogdir : sLogdirs) {
-        Path logdir = new Path(sLogdir);
-        logDirs.add(logdir);
-      }
-    } catch (IOException e) {
+    } catch (UnsupportedFileSystemException e) {
       throw new YarnException("Failed to start ContainersLauncher", e);
     }
-    localDirs = Collections.unmodifiableList(localDirs);
-    logDirs = Collections.unmodifiableList(logDirs);
-    sysDirs = Collections.unmodifiableList(sysDirs);
     super.init(conf);
   }
 
@@ -137,24 +105,10 @@ public class ContainersLauncher extends 
         Application app =
           context.getApplications().get(containerId.getAppId());
         String appIdStr = ConverterUtils.toString(app.getAppId());
-        List<Path> appDirs = new ArrayList<Path>(localDirs.size());
-        for (Path p : localDirs) {
-          Path usersdir = new Path(p, ContainerLocalizer.USERCACHE);
-          Path userdir = new Path(usersdir, userName);
-          Path appsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
-          appDirs.add(new Path(appsdir, appIdStr));
-        }
-        Path appSysDir =
-          new Path(sysDirs.get(0), appIdStr);
-        Path appLogDir =
-            new Path(logDirs.get(0), appIdStr);
-        // TODO: ROUND_ROBIN above.
         // TODO set in Application
-        //Path appLogDir = new Path(logDirs.get(0), app.toString());
-        ContainerLaunch launch =
-          new ContainerLaunch(dispatcher, exec, app,
-              event.getContainer(), appSysDir, appLogDir, appDirs);
-        // TODO: ROUND_ROBIN above.
+      ContainerLaunch launch =
+          new ContainerLaunch(getConfig(), dispatcher, exec, app,
+              event.getContainer());
         running.put(containerId,
             new RunningContainer(userName,
                 containerLauncher.submit(launch)));

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Tue May 10 15:18:58 2011
@@ -20,14 +20,8 @@ package org.apache.hadoop.yarn.server.no
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
-
 import java.net.InetSocketAddress;
-
 import java.security.PrivilegedAction;
-import java.security.PrivilegedExceptionAction;
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -56,16 +50,12 @@ import org.apache.hadoop.security.Securi
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -91,7 +81,6 @@ public class ContainerLocalizer {
 
   private final String user;
   private final String appId;
-  private final Path logDir;
   private final List<Path> localDirs;
   private final String localizerId;
   private final FileContext lfs;
@@ -101,16 +90,8 @@ public class ContainerLocalizer {
   private final RecordFactory recordFactory;
   private final Map<LocalResource,Future<Path>> pendingResources;
 
-  public ContainerLocalizer(String user, String appId, String localizerId,
-      Path logDir, List<Path> localDirs) throws IOException {
-    this(FileContext.getLocalFSFileContext(), user, appId, localizerId, logDir,
-        localDirs, new HashMap<LocalResource,Future<Path>>(),
-        RecordFactoryProvider.getRecordFactory(null));
-  }
-
-  ContainerLocalizer(FileContext lfs, String user, String appId,
-      String localizerId, Path logDir, List<Path> localDirs,
-      Map<LocalResource,Future<Path>> pendingResources,
+  public ContainerLocalizer(FileContext lfs, String user, String appId,
+      String localizerId, List<Path> localDirs,
       RecordFactory recordFactory) throws IOException {
     if (null == user) {
       throw new IOException("Cannot initialize for null user");
@@ -121,7 +102,6 @@ public class ContainerLocalizer {
     this.lfs = lfs;
     this.user = user;
     this.appId = appId;
-    this.logDir = logDir;
     this.localDirs = localDirs;
     this.localizerId = localizerId;
     this.recordFactory = recordFactory;
@@ -130,7 +110,7 @@ public class ContainerLocalizer {
       new LocalDirAllocator(String.format(APPCACHE_CTXT_FMT, appId));
     this.userDirs =
       new LocalDirAllocator(String.format(USERCACHE_CTXT_FMT, appId));
-    this.pendingResources = pendingResources;
+    this.pendingResources = new HashMap<LocalResource,Future<Path>>();
   }
 
   LocalizationProtocol getProxy(final InetSocketAddress nmAddr) {
@@ -148,11 +128,12 @@ public class ContainerLocalizer {
   public int runLocalization(final InetSocketAddress nmAddr)
       throws IOException, InterruptedException {
     // load credentials
-    initDirs(conf, user, appId, lfs, logDir, localDirs);
+    initDirs(conf, user, appId, lfs, localDirs);
     final Credentials creds = new Credentials();
     DataInputStream credFile = null;
     try {
       // assume credentials in cwd
+      // TODO: Fix
       credFile = lfs.open(
           new Path(String.format(TOKEN_FILE_FMT, localizerId)));
       creds.readTokenStorageStream(credFile);
@@ -191,6 +172,8 @@ public class ContainerLocalizer {
       localizeFiles(nodeManager, exec, ugi);
       return 0;
     } catch (Throwable e) {
+      // Print traces to stdout so that they can be logged by the NM address
+      // space.
       e.printStackTrace(System.out);
       return -1;
     } finally {
@@ -201,6 +184,7 @@ public class ContainerLocalizer {
   }
 
   ExecutorService createDownloadThreadPool() {
+    // TODO: Only Single thread?
     return Executors.newSingleThreadExecutor();
   }
 
@@ -213,7 +197,7 @@ public class ContainerLocalizer {
     TimeUnit.SECONDS.sleep(duration);
   }
 
-  void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec,
+  private void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec,
       UserGroupInformation ugi) {
     while (true) {
       try {
@@ -236,6 +220,7 @@ public class ContainerLocalizer {
                 lda = appDirs;
                 break;
               }
+              // TODO: Synchronization??
               pendingResources.put(r, exec.submit(download(lda, r, ugi)));
             }
           }
@@ -263,9 +248,17 @@ public class ContainerLocalizer {
     }
   }
 
-  LocalizerStatus createStatus() throws InterruptedException {
+  /**
+   * Create the payload for the HeartBeat. Mainly the list of
+   * {@link LocalResourceStatus}es
+   * 
+   * @return a {@link LocalizerStatus} that can be sent via heartbeat.
+   * @throws InterruptedException
+   */
+  private LocalizerStatus createStatus() throws InterruptedException {
     final List<LocalResourceStatus> currentResources =
       new ArrayList<LocalResourceStatus>();
+    // TODO: Synchronization??
     for (Iterator<LocalResource> i = pendingResources.keySet().iterator();
          i.hasNext();) {
       LocalResource rsrc = i.next();
@@ -315,8 +308,7 @@ public class ContainerLocalizer {
       String locId = argv[2];
       InetSocketAddress nmAddr =
           new InetSocketAddress(argv[3], Integer.parseInt(argv[4]));
-      Path logDir = new Path(argv[5]);
-      String[] sLocaldirs = Arrays.copyOfRange(argv, 6, argv.length);
+      String[] sLocaldirs = Arrays.copyOfRange(argv, 5, argv.length);
       ArrayList<Path> localDirs = new ArrayList<Path>(sLocaldirs.length);
       for (String sLocaldir : sLocaldirs) {
         localDirs.add(new Path(sLocaldir));
@@ -330,7 +322,9 @@ public class ContainerLocalizer {
       }
 
       ContainerLocalizer localizer =
-          new ContainerLocalizer(user, appId, locId, logDir, localDirs);
+          new ContainerLocalizer(FileContext.getLocalFSFileContext(), user,
+              appId, locId, localDirs,
+              RecordFactoryProvider.getRecordFactory(null));
       System.exit(localizer.runLocalization(nmAddr));
     } catch (Throwable e) {
       // Print error to stdout so that LCE can use it.
@@ -339,130 +333,33 @@ public class ContainerLocalizer {
     }
   }
 
+  // TODO: Won't there be a race for creating these dirs.
   private static void initDirs(Configuration conf, String user, String appId,
-      FileContext lfs, Path logDir, List<Path> localDirs) throws IOException {
+      FileContext lfs, List<Path> localDirs) throws IOException {
     if (null == localDirs || 0 == localDirs.size()) {
       throw new IOException("Cannot initialize without local dirs");
     }
-    String[] appCacheDirs = new String[localDirs.size()];
-    String[] userCacheDirs = new String[localDirs.size()];
+    String[] appsFileCacheDirs = new String[localDirs.size()];
+    String[] usersFileCacheDirs = new String[localDirs.size()];
     for (int i = 0, n = localDirs.size(); i < n; ++i) {
       // $x/usercache/$user
       Path base = lfs.makeQualified(
           new Path(new Path(localDirs.get(i), USERCACHE), user));
       // $x/usercache/$user/filecache
-      Path uCacheDir = new Path(base, FILECACHE);
-      userCacheDirs[i] = uCacheDir.toString();
-      lfs.mkdir(uCacheDir, null, true);
+      Path userFileCacheDir = new Path(base, FILECACHE);
+      usersFileCacheDirs[i] = userFileCacheDir.toString();
+      lfs.mkdir(userFileCacheDir, null, false);
       // $x/usercache/$user/appcache/$appId
       Path appBase = new Path(base, new Path(APPCACHE, appId));
-      lfs.mkdir(appBase, null, true);
       // $x/usercache/$user/appcache/$appId/filecache
-      Path aCacheDir = new Path(appBase, FILECACHE);
-      appCacheDirs[i] = aCacheDir.toString();
-      lfs.mkdir(aCacheDir, null, true);
+      Path appFileCacheDir = new Path(appBase, FILECACHE);
+      appsFileCacheDirs[i] = appFileCacheDir.toString();
+      lfs.mkdir(appFileCacheDir, null, false);
       // $x/usercache/$user/appcache/$appId/output
-      lfs.mkdir(new Path(appBase, OUTPUTDIR), null, true);
-    }
-    conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appCacheDirs);
-    conf.setStrings(String.format(USERCACHE_CTXT_FMT, appId), userCacheDirs);
-    Path appLogDir = new Path(logDir, appId);
-    lfs.mkdir(appLogDir, null, true);
-  }
-
-  public static void writeLaunchEnv(OutputStream out,
-      Map<String,String> environment, Map<Path,String> resources,
-      List<String> command, List<Path> appDirs)
-      throws IOException {
-    ShellScriptBuilder sb = new ShellScriptBuilder();
-    if (System.getenv("YARN_HOME") != null) {
-      sb.env("YARN_HOME", System.getenv("YARN_HOME"));
-    }
-    sb.env(YARNApplicationConstants.LOCAL_DIR_ENV,
-        StringUtils.join(",", appDirs));
-    if (environment != null) {
-      for (Map.Entry<String,String> env : environment.entrySet()) {
-        sb.env(env.getKey().toString(), env.getValue().toString());
-      }
-    }
-    if (resources != null) {
-      for (Map.Entry<Path,String> link : resources.entrySet()) {
-        sb.symlink(link.getKey(), link.getValue());
-      }
-    }
-    ArrayList<String> cmd = new ArrayList<String>(2 * command.size() + 5);
-    cmd.add(ContainerExecutor.isSetsidAvailable ? "exec setsid " : "exec ");
-    cmd.add("/bin/bash ");
-    cmd.add("-c ");
-    cmd.add("\"");
-    for (String cs : command) {
-      cmd.add(cs.toString());
-      cmd.add(" ");
-    }
-    cmd.add("\"");
-    sb.line(cmd.toArray(new String[cmd.size()]));
-    PrintStream pout = null;
-    try {
-      pout = new PrintStream(out);
-      sb.write(pout);
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-    }
-  }
-
-  private static class ShellScriptBuilder {
-
-    private final StringBuilder sb;
-
-    public ShellScriptBuilder() {
-      this(new StringBuilder("#!/bin/bash\n\n"));
-    }
-
-    protected ShellScriptBuilder(StringBuilder sb) {
-      this.sb = sb;
-    }
-
-    public ShellScriptBuilder env(String key, String value) {
-      line("export ", key, "=\"", value, "\"");
-      return this;
-    }
-
-    public ShellScriptBuilder symlink(Path src, String dst) throws IOException {
-      return symlink(src, new Path(dst));
-    }
-
-    public ShellScriptBuilder symlink(Path src, Path dst) throws IOException {
-      if (!src.isAbsolute()) {
-        throw new IOException("Source must be absolute");
-      }
-      if (dst.isAbsolute()) {
-        throw new IOException("Destination must be relative");
-      }
-      if (dst.toUri().getPath().indexOf('/') != -1) {
-        line("mkdir -p ", dst.getParent().toString());
-      }
-      line("ln -sf ", src.toUri().getPath(), " ", dst.toString());
-      return this;
-    }
-
-    public void write(PrintStream out) throws IOException {
-      out.append(sb);
-    }
-
-    public void line(String... command) {
-      for (String s : command) {
-        sb.append(s);
-      }
-      sb.append("\n");
+      lfs.mkdir(new Path(appBase, OUTPUTDIR), null, false);
     }
-
-    @Override
-    public String toString() {
-      return sb.toString();
-    }
-
+    conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
+    conf.setStrings(String.format(USERCACHE_CTXT_FMT, appId), usersFileCacheDirs);
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTracker.java Tue May 10 15:18:58 2011
@@ -18,11 +18,17 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
-public interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
+/**
+ * Component tracking resources all of the same {@link LocalResourceVisibility}
+ * 
+ */
+interface LocalResourcesTracker extends EventHandler<ResourceEvent> {
 
-  public boolean contains(LocalResourceRequest resource);
+  // TODO: Not used at all!!
+  boolean contains(LocalResourceRequest resource);
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Tue May 10 15:18:58 2011
@@ -21,9 +21,15 @@ import java.util.concurrent.ConcurrentHa
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
 
+/**
+ * A collection of {@link LocalizedResource}s all of same
+ * {@link LocalResourceVisibility}.
+ * 
+ */
 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
 
   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
@@ -36,8 +42,9 @@ class LocalResourcesTrackerImpl implemen
     this.dispatcher = dispatcher;
   }
 
+  @Override
   public void handle(ResourceEvent event) {
-    LocalResourceRequest req = event.getLocalResource();
+    LocalResourceRequest req = event.getLocalResourceRequest();
     LocalizedResource rsrc = localrsrc.get(req);
     switch (event.getType()) {
     case REQUEST:
@@ -57,6 +64,7 @@ class LocalResourcesTrackerImpl implemen
     rsrc.handle(event);
   }
 
+  @Override
   public boolean contains(LocalResourceRequest resource) {
     return localrsrc.contains(resource);
   }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java?rev=1101502&r1=1101501&r2=1101502&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalizedResource.java Tue May 10 15:18:58 2011
@@ -41,6 +41,11 @@ import org.apache.hadoop.yarn.state.Sing
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
+/**
+ * Datum representing a localized resource. Holds the statemachine of a
+ * resource. State of the resource is one of {@link ResourceState}.
+ * 
+ */
 public class LocalizedResource implements EventHandler<ResourceEvent> {
 
   private static final Log LOG = LogFactory.getLog(LocalizedResource.class);
@@ -52,13 +57,15 @@ public class LocalizedResource implement
   final StateMachine<ResourceState,ResourceEventType,ResourceEvent>
     stateMachine;
   final Semaphore sem = new Semaphore(1);
-  final Queue<ContainerId> ref;
+  final Queue<ContainerId> ref; // Queue of containers using this localized
+                                // resource
   final AtomicLong timestamp = new AtomicLong(currentTime());
 
   private static final StateMachineFactory<LocalizedResource,ResourceState,
       ResourceEventType,ResourceEvent> stateMachineFactory =
         new StateMachineFactory<LocalizedResource,ResourceState,
           ResourceEventType,ResourceEvent>(ResourceState.INIT)
+
     // From INIT (ref == 0, awaiting req)
     .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
         ResourceEventType.REQUEST, new FetchResourceTransition())
@@ -66,14 +73,16 @@ public class LocalizedResource implement
         ResourceEventType.LOCALIZED, new FetchDirectTransition())
     .addTransition(ResourceState.INIT, ResourceState.INIT,
         ResourceEventType.RELEASE, new ReleaseTransition())
+
     // From DOWNLOADING (ref > 0, may be localizing)
     .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
-        ResourceEventType.REQUEST, new FetchResourceTransition())
+        ResourceEventType.REQUEST, new FetchResourceTransition()) // TODO: Duplicate addition!!
     .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
         ResourceEventType.LOCALIZED, new FetchSuccessTransition())
     .addTransition(ResourceState.DOWNLOADING,
         EnumSet.of(ResourceState.DOWNLOADING, ResourceState.INIT),
         ResourceEventType.RELEASE, new ReleasePendingTransition())
+
     // From LOCALIZED (ref >= 0, on disk)
     .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
         ResourceEventType.REQUEST, new LocalizedResourceTransition())
@@ -132,8 +141,10 @@ public class LocalizedResource implement
     sem.release();
   }
 
+  @Override
   public synchronized void handle(ResourceEvent event) {
     stateMachine.doTransition(event.getType(), event);
+    // TODO: Invalid transitions?
   }
 
   static abstract class ResourceTransition implements