You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:42:02 UTC

svn commit: r1077111 [2/3] - in /hadoop/common/branches/branch-0.20-security-patches: ./ conf/ src/c++/task-controller/ src/c++/task-controller/tests/ src/contrib/ src/contrib/streaming/src/test/org/apache/hadoop/streaming/ src/docs/src/documentation/c...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 03:42:01 2011
@@ -24,12 +24,11 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -73,52 +72,27 @@ class LinuxTaskController extends TaskCo
         new File(hadoopBin, "task-controller").getAbsolutePath();
   }
   
-  // The list of directory paths specified in the
-  // variable mapred.local.dir. This is used to determine
-  // which among the list of directories is picked up
-  // for storing data for a particular task.
-  private String[] mapredLocalDirs;
-  
-  // permissions to set on files and directories created.
-  // When localized files are handled securely, this string
-  // will change to something more restrictive. Until then,
-  // it opens up the permissions for all, so that the tasktracker
-  // and job owners can access files together.
-  private static final String FILE_PERMISSIONS = "ugo+rwx";
-  
-  // permissions to set on components of the path leading to
-  // localized files and directories. Read and execute permissions
-  // are required for different users to be able to access the
-  // files.
-  private static final String PATH_PERMISSIONS = "go+rx";
-  
   public LinuxTaskController() {
     super();
   }
   
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    mapredLocalDirs = conf.getStrings("mapred.local.dir");
-    //Setting of the permissions of the local directory is done in 
-    //setup()
-  }
-  
   /**
    * List of commands that the setuid script will execute.
    */
   enum TaskCommands {
+    INITIALIZE_JOB,
     LAUNCH_TASK_JVM,
+    INITIALIZE_TASK,
     TERMINATE_TASK_JVM,
-    KILL_TASK_JVM
+    KILL_TASK_JVM,
   }
-  
+
   /**
    * Launch a task JVM that will run as the owner of the job.
    * 
-   * This method launches a task JVM by executing a setuid
-   * executable that will switch to the user and run the
-   * task.
+   * This method launches a task JVM by executing a setuid executable that will
+   * switch to the user and run the task. Also does initialization of the first
+   * task in the same setuid process launch.
    */
   @Override
   void launchTaskJVM(TaskController.TaskControllerContext context) 
@@ -150,48 +124,103 @@ class LinuxTaskController extends TaskCo
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
-                                    launchTaskJVMArgs, env);
+                                    launchTaskJVMArgs, env.workDir, env.env);
     context.shExec = shExec;
     try {
       shExec.execute();
     } catch (Exception e) {
-      LOG.warn("Exception thrown while launching task JVM : " + 
-          StringUtils.stringifyException(e));
-      LOG.warn("Exit code from task is : " + shExec.getExitCode());
-      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      int exitCode = shExec.getExitCode();
+      LOG.warn("Exit code from task is : " + exitCode);
+      // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
+      // terminated/killed forcefully. In all other cases, log the
+      // task-controller output
+      if (exitCode != 143 && exitCode != 137) {
+        LOG.warn("Exception thrown while launching task JVM : "
+            + StringUtils.stringifyException(e));
+        LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+        logOutput(shExec.getOutput());
+      }
       throw new IOException(e);
     }
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("output after executing task jvm = " + shExec.getOutput()); 
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+      logOutput(shExec.getOutput());
     }
   }
 
   /**
-   * Returns list of arguments to be passed while launching task VM.
-   * See {@code buildTaskControllerExecutor(TaskCommands, 
-   * String, List<String>, JvmEnv)} documentation.
+   * Helper method that runs a LinuxTaskController command
+   * 
+   * @param taskCommand
+   * @param user
+   * @param cmdArgs
+   * @param env
+   * @throws IOException
+   */
+  private void runCommand(TaskCommands taskCommand, String user,
+      List<String> cmdArgs, File workDir, Map<String, String> env)
+      throws IOException {
+
+    ShellCommandExecutor shExec =
+        buildTaskControllerExecutor(taskCommand, user, cmdArgs, workDir, env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exit code from " + taskCommand.toString() + " is : "
+          + shExec.getExitCode());
+      LOG.warn("Exception thrown by " + taskCommand.toString() + " : "
+          + StringUtils.stringifyException(e));
+      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+          + " follows:");
+      logOutput(shExec.getOutput());
+      throw new IOException(e);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Output from LinuxTaskController's " + taskCommand.toString()
+          + " follows:");
+      logOutput(shExec.getOutput());
+    }
+  }
+
+  /**
+   * Returns list of arguments to be passed while initializing a new task. See
+   * {@code buildTaskControllerExecutor(TaskCommands, String, List<String>,
+   * JvmEnv)} documentation.
+   * 
    * @param context
    * @return Argument to be used while launching Task VM
    */
-  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+  private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     String taskId = context.task.getTaskID().toString();
     String jobId = getJobId(context);
-    LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context));
-    commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
-        context));
     commandArgs.add(jobId);
-    if(!context.task.isTaskCleanupTask()) {
+    if (!context.task.isTaskCleanupTask()) {
       commandArgs.add(taskId);
-    }else {
+    } else {
       commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
     }
     return commandArgs;
   }
-  
-  // get the Job ID from the information in the TaskControllerContext
+
+  @Override
+  void initializeTask(TaskControllerContext context)
+      throws IOException {
+    LOG.debug("Going to do " + TaskCommands.INITIALIZE_TASK.toString()
+        + " for " + context.task.getTaskID().toString());
+    runCommand(TaskCommands.INITIALIZE_TASK, context.env.conf.getUser(),
+        buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+  }
+
+  private void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
+      }
+    }
+  }
+
   private String getJobId(TaskControllerContext context) {
     String taskId = context.task.getTaskID().toString();
     TaskAttemptID tId = TaskAttemptID.forName(taskId);
@@ -199,6 +228,27 @@ class LinuxTaskController extends TaskCo
     return jobId;
   }
 
+  /**
+   * Returns list of arguments to be passed while launching task VM.
+   * See {@code buildTaskControllerExecutor(TaskCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * @param context
+   * @return Argument to be used while launching Task VM
+   */
+  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
+    List<String> commandArgs = new ArrayList<String>(3);
+    LOG.debug("getting the task directory as: " 
+        + getTaskCacheDirectory(context));
+    LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context)), 
+        context) );
+    commandArgs.add(getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context)), 
+        context));
+    commandArgs.addAll(buildInitializeTaskArgs(context));
+    return commandArgs;
+  }
+
   // Get the directory from the list of directories configured
   // in mapred.local.dir chosen for storing data pertaining to
   // this task.
@@ -208,8 +258,8 @@ class LinuxTaskController extends TaskCo
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
       File mapredDir = new File(dir);
-      File taskDir = new File(mapredDir, TaskTracker.getLocalTaskDir(
-          jobId, taskId, context.task.isTaskCleanupTask()));
+      File taskDir = new File(mapredDir, TaskTracker.getTaskWorkDir(
+          jobId, taskId, context.task.isTaskCleanupTask())).getParentFile();
       if (directory.equals(taskDir)) {
         return dir;
       }
@@ -219,68 +269,7 @@ class LinuxTaskController extends TaskCo
     throw new IllegalArgumentException("invalid task cache directory "
                 + directory.getAbsolutePath());
   }
-  
-  /**
-   * Setup appropriate permissions for directories and files that
-   * are used by the task.
-   * 
-   * As the LinuxTaskController launches tasks as a user, different
-   * from the daemon, all directories and files that are potentially 
-   * used by the tasks are setup with appropriate permissions that
-   * will allow access.
-   * 
-   * Until secure data handling is implemented (see HADOOP-4491 and
-   * HADOOP-4493, for e.g.), the permissions are set up to allow
-   * read, write and execute access for everyone. This will be 
-   * changed to restricted access as data is handled securely.
-   */
-  void initializeTask(TaskControllerContext context) {
-    // Setup permissions for the job and task cache directories.
-    setupTaskCacheFileAccess(context);
-    // setup permissions for task log directory
-    setupTaskLogFileAccess(context);    
-  }
-  
-  // Allows access for the task to create log files under 
-  // the task log directory
-  private void setupTaskLogFileAccess(TaskControllerContext context) {
-    TaskAttemptID taskId = context.task.getTaskID();
-    File f = TaskLog.getTaskLogFile(taskId, TaskLog.LogName.SYSLOG);
-    String taskAttemptLogDir = f.getParentFile().getAbsolutePath();
-    changeDirectoryPermissions(taskAttemptLogDir, FILE_PERMISSIONS, false);
-  }
 
-  // Allows access for the task to read, write and execute 
-  // the files under the job and task cache directories
-  private void setupTaskCacheFileAccess(TaskControllerContext context) {
-    String taskId = context.task.getTaskID().toString();
-    JobID jobId = JobID.forName(getJobId(context));
-    //Change permission for the task across all the disks
-    for(String localDir : mapredLocalDirs) {
-      File f = new File(localDir);
-      File taskCacheDir = new File(f,TaskTracker.getLocalTaskDir(
-          jobId.toString(), taskId, context.task.isTaskCleanupTask()));
-      if(taskCacheDir.exists()) {
-        changeDirectoryPermissions(taskCacheDir.getPath(), 
-            FILE_PERMISSIONS, true);
-      }          
-    }//end of local directory Iteration 
-  }
-
-  // convenience method to execute chmod.
-  private void changeDirectoryPermissions(String dir, String mode, 
-                                              boolean isRecursive) {
-    int ret = 0;
-    try {
-      ret = FileUtil.chmod(dir, mode, isRecursive);
-    } catch (Exception e) {
-      LOG.warn("Exception in changing permissions for directory " + dir + 
-                  ". Exception: " + e.getMessage());
-    }
-    if (ret != 0) {
-      LOG.warn("Could not change permissions for directory " + dir);
-    }
-  }
   /**
    * Builds the command line for launching/terminating/killing task JVM.
    * Following is the format for launching/terminating/killing task JVM
@@ -295,14 +284,15 @@ class LinuxTaskController extends TaskCo
    * @param command command to be executed.
    * @param userName user name
    * @param cmdArgs list of extra arguments
+   * @param workDir working directory for the task-controller
    * @param env JVM environment variables.
    * @return {@link ShellCommandExecutor}
    * @throws IOException
    */
-  private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
-                                          String userName, 
-                                          List<String> cmdArgs, JvmEnv env) 
-                                    throws IOException {
+  private ShellCommandExecutor buildTaskControllerExecutor(
+      TaskCommands command, String userName, List<String> cmdArgs,
+      File workDir, Map<String, String> env)
+      throws IOException {
     String[] taskControllerCmd = new String[3 + cmdArgs.size()];
     taskControllerCmd[0] = getTaskControllerExecutablePath();
     taskControllerCmd[1] = userName;
@@ -317,9 +307,9 @@ class LinuxTaskController extends TaskCo
       }
     }
     ShellCommandExecutor shExec = null;
-    if(env.workDir != null && env.workDir.exists()) {
+    if(workDir != null && workDir.exists()) {
       shExec = new ShellCommandExecutor(taskControllerCmd,
-          env.workDir, env.env);
+          workDir, env);
     } else {
       shExec = new ShellCommandExecutor(taskControllerCmd);
     }
@@ -376,66 +366,20 @@ class LinuxTaskController extends TaskCo
     return taskControllerExe;
   }  
 
-  /**
-   * Sets up the permissions of the following directories:
-   * 
-   * Job cache directory
-   * Archive directory
-   * Hadoop log directories
-   * 
-   */
-  @Override
-  void setup() {
-    //set up job cache directory and associated permissions
-    String localDirs[] = this.mapredLocalDirs;
-    for(String localDir : localDirs) {
-      //Cache root
-      File cacheDirectory = new File(localDir,TaskTracker.getCacheSubdir());
-      File jobCacheDirectory = new File(localDir,TaskTracker.getJobCacheSubdir());
-      if(!cacheDirectory.exists()) {
-        if(!cacheDirectory.mkdirs()) {
-          LOG.warn("Unable to create cache directory : " + 
-              cacheDirectory.getPath());
-        }
-      }
-      if(!jobCacheDirectory.exists()) {
-        if(!jobCacheDirectory.mkdirs()) {
-          LOG.warn("Unable to create job cache directory : " + 
-              jobCacheDirectory.getPath());
-        }
-      }
-      //Give world writable permission for every directory under
-      //mapred-local-dir.
-      //Child tries to write files under it when executing.
-      changeDirectoryPermissions(localDir, FILE_PERMISSIONS, true);
-    }//end of local directory manipulations
-    //setting up perms for user logs
-    File taskLog = TaskLog.getUserLogDir();
-    changeDirectoryPermissions(taskLog.getPath(), FILE_PERMISSIONS,false);
+  private List<String> buildInitializeJobCommandArgs(
+      JobInitializationContext context) {
+    List<String> initJobCmdArgs = new ArrayList<String>();
+    initJobCmdArgs.add(context.jobid.toString());
+    return initJobCmdArgs;
   }
 
-  /*
-   * Create Job directories across disks and set their permissions to 777
-   * This way when tasks are run we just need to setup permissions for
-   * task folder.
-   */
   @Override
-  void initializeJob(JobID jobid) {
-    for(String localDir : this.mapredLocalDirs) {
-      File jobDirectory = new File(localDir, 
-          TaskTracker.getLocalJobDir(jobid.toString()));
-      if(!jobDirectory.exists()) {
-        if(!jobDirectory.mkdir()) {
-          LOG.warn("Unable to create job cache directory : " 
-              + jobDirectory.getPath());
-          continue;
-        }
-      }
-      //Should be recursive because the jar and work folders might be 
-      //present under the job cache directory
-      changeDirectoryPermissions(
-          jobDirectory.getPath(), FILE_PERMISSIONS, true);
-    }
+  void initializeJob(JobInitializationContext context)
+      throws IOException {
+    LOG.debug("Going to initialize job " + context.jobid.toString()
+        + " on the TT");
+    runCommand(TaskCommands.INITIALIZE_JOB, context.user,
+        buildInitializeJobCommandArgs(context), context.workDir, null);
   }
   
   /**
@@ -470,7 +414,7 @@ class LinuxTaskController extends TaskCo
     }
     ShellCommandExecutor shExec = buildTaskControllerExecutor(
         command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env);
+        buildKillTaskCommandArgs(context), context.env.workDir, context.env.env);
     try {
       shExec.execute();
     } catch (Exception e) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:42:01 2011
@@ -21,12 +21,17 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.mapred.JobTrackerMetricsInst;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
@@ -62,7 +67,7 @@ class LocalJobRunner implements JobSubmi
 
     private JobStatus status;
     private ArrayList<TaskAttemptID> mapIds = new ArrayList<TaskAttemptID>();
-    private MapOutputFile mapoutputFile;
+
     private JobProfile profile;
     private Path localFile;
     private FileSystem localFs;
@@ -83,8 +88,6 @@ class LocalJobRunner implements JobSubmi
       this.systemJobDir = new Path(jobSubmitDir);
       this.file = new Path(systemJobDir, "job.xml");
       this.id = jobid;
-      this.mapoutputFile = new MapOutputFile(jobid);
-      this.mapoutputFile.setConf(conf);
 
       this.localFile = new JobConf(conf).getLocalPath(jobDir+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
@@ -120,7 +123,9 @@ class LocalJobRunner implements JobSubmi
         }
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
-        
+
+        Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
+          new HashMap<TaskAttemptID, MapOutputFile>();
         for (int i = 0; i < taskSplitMetaInfos.length; i++) {
           if (!this.isInterrupted()) {
             TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
@@ -129,6 +134,12 @@ class LocalJobRunner implements JobSubmi
                                       mapId, i,
                                       taskSplitMetaInfos[i].getSplitIndex(), 1);
             JobConf localConf = new JobConf(job);
+            TaskRunner.setupChildMapredLocalDirs(map, localConf);
+
+            MapOutputFile mapOutput = new MapOutputFile();
+            mapOutput.setConf(localConf);
+            mapOutputFiles.put(mapId, mapOutput);
+
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);
             map.setConf(localConf);
@@ -146,14 +157,21 @@ class LocalJobRunner implements JobSubmi
           new TaskAttemptID(new TaskID(jobId, false, 0), 0);
         try {
           if (numReduceTasks > 0) {
+            ReduceTask reduce =
+                new ReduceTask(file.toString(), reduceId, 0, mapIds.size(),
+                    1);
+            JobConf localConf = new JobConf(job);
+            TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
               if (!this.isInterrupted()) {
                 TaskAttemptID mapId = mapIds.get(i);
-                Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-                Path reduceIn = this.mapoutputFile.getInputFileForWrite(
-                                  mapId.getTaskID(),reduceId,
-                                  localFs.getLength(mapOut));
+                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+                MapOutputFile localOutputFile = new MapOutputFile();
+                localOutputFile.setConf(localConf);
+                Path reduceIn =
+                  localOutputFile.getInputFileForWrite(mapId.getTaskID(),
+                        localFs.getFileStatus(mapOut).getLen());
                 if (!localFs.mkdirs(reduceIn.getParent())) {
                   throw new IOException("Mkdirs failed to create "
                       + reduceIn.getParent().toString());
@@ -165,10 +183,6 @@ class LocalJobRunner implements JobSubmi
               }
             }
             if (!this.isInterrupted()) {
-              ReduceTask reduce = new ReduceTask(file.toString(), 
-                                                 reduceId, 0, mapIds.size(), 
-                                                 1);
-              JobConf localConf = new JobConf(job);
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
@@ -183,11 +197,8 @@ class LocalJobRunner implements JobSubmi
             }
           }
         } finally {
-          for (TaskAttemptID mapId: mapIds) {
-            this.mapoutputFile.removeAll(mapId);
-          }
-          if (numReduceTasks == 1) {
-            this.mapoutputFile.removeAll(reduceId);
+          for (MapOutputFile output : mapOutputFiles.values()) {
+            output.removeAll();
           }
         }
         // delete the temporary directory in output directory

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java Fri Mar  4 03:42:01 2011
@@ -30,144 +30,152 @@ import org.apache.hadoop.fs.Path;
 class MapOutputFile {
 
   private JobConf conf;
-  private JobID jobId;
-  
-  MapOutputFile() {
-  }
 
-  MapOutputFile(JobID jobId) {
-    this.jobId = jobId;
+  static final String REDUCE_INPUT_FILE_FORMAT_STRING = "%s/map_%d.out";
+
+  MapOutputFile() {
   }
 
   private LocalDirAllocator lDirAlloc = 
                             new LocalDirAllocator("mapred.local.dir");
   
-  /** Return the path to local map output file created earlier
-   * @param mapTaskId a map task id
-   */
-  public Path getOutputFile(TaskAttemptID mapTaskId)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out", conf);
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out", conf);
   }
 
-  /** Create a local map output file name.
-   * @param mapTaskId a map task id
+  /**
+   * Create a local map output file name.
+   * 
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
-    throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out", size, conf);
-  }
-
-  /** Return the path to a local map output index file created earlier
-   * @param mapTaskId a map task id
-   */
-  public Path getOutputIndexFile(TaskAttemptID mapTaskId)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out.index", conf);
-  }
-
-  /** Create a local map output index file name.
-   * @param mapTaskId a map task id
+  public Path getOutputFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out", size, conf);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out.index", conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
-    throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/file.out.index", 
-                       size, conf);
+  public Path getOutputIndexFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + Path.SEPARATOR
+        + "file.out.index", size, conf);
   }
 
-  /** Return a local map spill file created earlier.
-   * @param mapTaskId a map task id
+  /**
+   * Return a local map spill file created earlier.
+   * 
    * @param spillNumber the number
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" 
-                       + spillNumber + ".out", conf);
+  public Path getSpillFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out", conf);
   }
 
-  /** Create a local map spill file name.
-   * @param mapTaskId a map task id
+  /**
+   * Create a local map spill file name.
+   * 
    * @param spillNumber the number
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, 
-         long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" + 
-                       spillNumber + ".out", size, conf);
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out", size, conf);
   }
 
-  /** Return a local map spill index file created earlier
-   * @param mapTaskId a map task id
+  /**
+   * Return a local map spill index file created earlier
+   * 
    * @param spillNumber the number
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
-    throws IOException {
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" + 
-                       spillNumber + ".out.index", conf);
+  public Path getSpillIndexFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out.index", conf);
   }
 
-  /** Create a local map spill index file name.
-   * @param mapTaskId a map task id
+  /**
+   * Create a local map spill index file name.
+   * 
    * @param spillNumber the number
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
-         long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), mapTaskId.toString())
-                       + "/spill" + spillNumber + 
-                       ".out.index", size, conf);
-  }
-
-  /** Return a local reduce input file created earlier
-   * @param mapTaskId a map task id
-   * @param reduceTaskId a reduce task id
-   */
-  public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
-    throws IOException {
-    // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), reduceTaskId.toString())
-                       + "/map_" + mapId + ".out",
-                       conf);
-  }
-
-  /** Create a local reduce input file name.
-   * @param mapTaskId a map task id
-   * @param reduceTaskId a reduce task id
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.OUTPUT + "/spill"
+        + spillNumber + ".out.index", size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, Integer
+            .valueOf(mapId)), conf);
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
    * @param size the size of the file
+   * @return path
+   * @throws IOException
    */
-  public Path getInputFileForWrite(TaskID mapId, TaskAttemptID reduceTaskId, 
-                                   long size)
-    throws IOException {
-    // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
-                       jobId.toString(), reduceTaskId.toString())
-                       + "/map_" + mapId.getId() + ".out", 
-                       size, conf);
+  public Path getInputFileForWrite(TaskID mapId, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        REDUCE_INPUT_FILE_FORMAT_STRING, TaskTracker.OUTPUT, mapId.getId()),
+        size, conf);
   }
 
   /** Removes all of the files related to a task. */
-  public void removeAll(TaskAttemptID taskId) throws IOException {
-    conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
-                          jobId.toString(), taskId.toString())
-);
+  public void removeAll()
+      throws IOException {
+    conf.deleteLocalFiles(TaskTracker.OUTPUT);
   }
 
   public void setConf(Configuration conf) {
@@ -177,9 +185,4 @@ class MapOutputFile {
       this.conf = new JobConf(conf);
     }
   }
-  
-  public void setJobId(JobID jobId) {
-    this.jobId = jobId;
-  }
-
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar  4 03:42:01 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -77,7 +78,6 @@ class MapTask extends Task {
    * The size of each record in the index file for the map-outputs.
    */
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-  
 
   private TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   private String splitClass;
@@ -106,12 +106,20 @@ class MapTask extends Task {
   }
 
   @Override
-  public void localizeConfiguration(JobConf conf) throws IOException {
+  public void localizeConfiguration(JobConf conf)
+      throws IOException {
     super.localizeConfiguration(conf);
+    // split.info file is used only by IsolationRunner.
+    // Write the split file to the local disk if it is a normal map task (not a
+    // job-setup or a job-cleanup task) and if the user wishes to run
+    // IsolationRunner either by setting keep.failed.tasks.files to true or by
+    // using keep.tasks.files.pattern
     if (supportIsolationRunner(conf) && isMapOrReduce()) {
       // localize the split meta-information
-      Path localSplitMeta = new Path(new Path(getJobFile()).getParent(), 
-                                 "split.info");
+      Path localSplitMeta =
+        new LocalDirAllocator("mapred.local.dir").getLocalPathForWrite(
+            TaskTracker.getLocalSplitFile(getJobID().toString(), getTaskID()
+                .toString()), conf);
       LOG.debug("Writing local split to " + localSplitMeta);
       DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta);
       splitMetaInfo.write(out);
@@ -1228,8 +1236,8 @@ class MapTask extends Task {
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);
-        final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
-            numSpills, size);
+        final Path filename =
+            mapOutputFile.getSpillFileForWrite(numSpills, size);
         out = rfs.create(filename);
 
         final int endPosition = (kvend > kvstart)
@@ -1293,9 +1301,9 @@ class MapTask extends Task {
 
         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
           // create spill index file
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-              getTaskID(), numSpills,
-              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          Path indexFilename =
+              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
           spillRec.writeToFile(indexFilename, job);
         } else {
           indexCacheList.add(spillRec);
@@ -1321,8 +1329,8 @@ class MapTask extends Task {
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);
-        final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
-            numSpills, size);
+        final Path filename =
+            mapOutputFile.getSpillFileForWrite(numSpills, size);
         out = rfs.create(filename);
         
         // we don't run the combiner for a single record
@@ -1358,9 +1366,9 @@ class MapTask extends Task {
         }
         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
           // create spill index file
-          Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
-              getTaskID(), numSpills,
-              partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+          Path indexFilename =
+              mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+                  * MAP_OUTPUT_INDEX_RECORD_LENGTH);
           spillRec.writeToFile(indexFilename, job);
         } else {
           indexCacheList.add(spillRec);
@@ -1450,14 +1458,14 @@ class MapTask extends Task {
       final TaskAttemptID mapId = getTaskID();
 
       for(int i = 0; i < numSpills; i++) {
-        filename[i] = mapOutputFile.getSpillFile(mapId, i);
+        filename[i] = mapOutputFile.getSpillFile(i);
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       if (numSpills == 1) { //the spill is the final output
         rfs.rename(filename[0],
             new Path(filename[0].getParent(), "file.out"));
         if (indexCacheList.size() == 0) {
-          rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
+          rfs.rename(mapOutputFile.getSpillIndexFile(0),
               new Path(filename[0].getParent(),"file.out.index"));
         } else {
           indexCacheList.get(0).writeToFile(
@@ -1468,7 +1476,7 @@ class MapTask extends Task {
 
       // read in paged indices
       for (int i = indexCacheList.size(); i < numSpills; ++i) {
-        Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
+        Path indexFileName = mapOutputFile.getSpillIndexFile(i);
         indexCacheList.add(new SpillRecord(indexFileName, job));
       }
 
@@ -1476,10 +1484,10 @@ class MapTask extends Task {
       //lengths for each partition
       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
       finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-      Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
-                             finalOutFileSize);
-      Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
-                            mapId, finalIndexFileSize);
+      Path finalOutputFile =
+          mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+      Path finalIndexFile =
+          mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
 
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Fri Mar  4 03:42:01 2011
@@ -34,13 +34,13 @@ class MapTaskRunner extends TaskRunner {
       return false;
     }
     
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
     return true;
   }
 
   /** Delete all of the temporary map output files. */
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:42:01 2011
@@ -213,7 +213,7 @@ class ReduceTask extends Task {
     if (isLocal) {
       // for local jobs
       for(int i = 0; i < numMaps; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
+        fileList.add(mapOutputFile.getInputFile(i));
       }
     } else {
       // for non local jobs
@@ -1287,12 +1287,11 @@ class ReduceTask extends Task {
         // else, we will check the localFS to find a suitable final location
         // for this path
         TaskAttemptID reduceId = reduceTask.getTaskID();
-        Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
-                                 reduceId.getJobID().toString(),
-                                 reduceId.toString()) 
-                                 + "/map_" +
-                                 loc.getTaskId().getId() + ".out");
-        
+        Path filename =
+            new Path(String.format(
+                MapOutputFile.REDUCE_INPUT_FILE_FORMAT_STRING,
+                TaskTracker.OUTPUT, loc.getTaskId().getId()));
+
         // Copy the map output to a temp file whose name is unique to this attempt 
         Path tmpMapOutput = new Path(filename+"-"+id);
         
@@ -2350,8 +2349,8 @@ class ReduceTask extends Task {
         if (numMemDiskSegments > 0 &&
               ioSortFactor > mapOutputFilesOnDisk.size()) {
           // must spill to disk, but can't retain in-mem for intermediate merge
-          final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
-                            reduceTask.getTaskID(), inMemToDiskBytes);
+          final Path outputPath =
+              mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes);
           final RawKeyValueIterator rIter = Merger.merge(job, fs,
               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
               tmpDir, comparator, reporter, spilledRecordsCounter, null);
@@ -2649,8 +2648,8 @@ class ReduceTask extends Task {
         long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
         int noInMemorySegments = inMemorySegments.size();
 
-        Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
-                          reduceTask.getTaskID(), mergeOutputSize);
+        Path outputPath =
+            mapOutputFile.getInputFileForWrite(mapId, mergeOutputSize);
 
         Writer writer = 
           new Writer(conf, rfs, outputPath,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Mar  4 03:42:01 2011
@@ -37,7 +37,7 @@ class ReduceTaskRunner extends TaskRunne
     }
     
     // cleanup from failures
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
     return true;
   }
   
@@ -46,6 +46,6 @@ class ReduceTaskRunner extends TaskRunne
   public void close() throws IOException {
     LOG.info(getTask()+" done; removing files.");
     getTask().getProgress().setStatus("closed");
-    mapOutputFile.removeAll(getTask().getTaskID());
+    mapOutputFile.removeAll();
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:42:01 2011
@@ -179,7 +179,6 @@ abstract public class Task implements Wr
                                                     TaskStatus.Phase.MAP : 
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
-    this.mapOutputFile.setJobId(taskId.getJobID());
     spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
@@ -405,7 +404,6 @@ abstract public class Task implements Wr
     partition = in.readInt();
     numSlotsRequired = in.readInt();
     taskStatus.readFields(in);
-    this.mapOutputFile.setJobId(taskId.getJobID()); 
     skipRanges.readFields(in);
     currentRecIndexIterator = skipRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 03:42:01 2011
@@ -17,13 +17,17 @@
 */
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
@@ -45,27 +49,95 @@ abstract class TaskController implements
   public Configuration getConf() {
     return conf;
   }
-  
+
+  // The list of directory paths specified in the variable mapred.local.dir.
+  // This is used to determine which among the list of directories is picked up
+  // for storing data for a particular task.
+  protected String[] mapredLocalDirs;
+
   public void setConf(Configuration conf) {
     this.conf = conf;
+    mapredLocalDirs = conf.getStrings("mapred.local.dir");
   }
-  
+
+  /**
+   * Sets up the permissions of the following directories on all the configured
+   * disks:
+   * <ul>
+   * <li>mapred-local directories</li>
+   * <li>Job cache directories</li>
+   * <li>Archive directories</li>
+   * <li>Hadoop log directories</li>
+   * </ul>
+   */
+  void setup() {
+    for (String localDir : this.mapredLocalDirs) {
+      // Set up the mapred-local directories.
+      File mapredlocalDir = new File(localDir);
+      if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
+        LOG.warn("Unable to create mapred-local directory : "
+            + mapredlocalDir.getPath());
+      } else {
+        PermissionsHandler.setPermissions(mapredlocalDir,
+            PermissionsHandler.sevenFiveFive);
+      }
+
+      // Set up the cache directory used for distributed cache files
+      File distributedCacheDir =
+          new File(localDir, TaskTracker.getDistributedCacheDir());
+      if (!distributedCacheDir.exists() && !distributedCacheDir.mkdirs()) {
+        LOG.warn("Unable to create cache directory : "
+            + distributedCacheDir.getPath());
+      } else {
+        PermissionsHandler.setPermissions(distributedCacheDir,
+            PermissionsHandler.sevenFiveFive);
+      }
+
+      // Set up the jobcache directory
+      File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
+      if (!jobCacheDir.exists() && !jobCacheDir.mkdirs()) {
+        LOG.warn("Unable to create job cache directory : "
+            + jobCacheDir.getPath());
+      } else {
+        PermissionsHandler.setPermissions(jobCacheDir,
+            PermissionsHandler.sevenFiveFive);
+      }
+    }
+
+    // Set up the user log directory
+    File taskLog = TaskLog.getUserLogDir();
+    if (!taskLog.exists() && !taskLog.mkdirs()) {
+      LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
+    } else {
+      PermissionsHandler.setPermissions(taskLog,
+          PermissionsHandler.sevenFiveFive);
+    }
+  }
+
   /**
-   * Setup task controller component.
+   * Take task-controller specific actions to initialize job. This involves
+   * setting appropriate permissions to job-files so as to secure the files to
+   * be accessible only by the user's tasks.
    * 
+   * @throws IOException
    */
-  abstract void setup();
-  
-  
+  abstract void initializeJob(JobInitializationContext context) throws IOException;
+
   /**
    * Launch a task JVM
    * 
-   * This method defines how a JVM will be launched to run a task.
+   * This method defines how a JVM will be launched to run a task. Each
+   * task-controller should also do an
+   * {@link #initializeTask(TaskControllerContext)} inside this method so as to
+   * initialize the task before launching it. This is for reasons of
+   * task-controller specific optimizations w.r.t combining initialization and
+   * launching of tasks.
+   * 
    * @param context the context associated to the task
    */
   abstract void launchTaskJVM(TaskControllerContext context)
                                       throws IOException;
-  
+
   /**
    * Top level cleanup a task JVM method.
    *
@@ -90,47 +162,44 @@ abstract class TaskController implements
     }
     killTask(context);
   }
-  
-  /**
-   * Perform initializing actions required before a task can run.
-   * 
-   * For instance, this method can be used to setup appropriate
-   * access permissions for files and directories that will be
-   * used by tasks. Tasks use the job cache, log, PID and distributed cache
-   * directories and files as part of their functioning. Typically,
-   * these files are shared between the daemon and the tasks
-   * themselves. So, a TaskController that is launching tasks
-   * as different users can implement this method to setup
-   * appropriate ownership and permissions for these directories
-   * and files.
-   */
-  abstract void initializeTask(TaskControllerContext context);
-  
-  
+
+  /** Perform initializing actions required before a task can run.
+    * 
+    * For instance, this method can be used to setup appropriate
+    * access permissions for files and directories that will be
+    * used by tasks. Tasks use the job cache, log, and distributed cache
+    * directories and files as part of their functioning. Typically,
+    * these files are shared between the daemon and the tasks
+    * themselves. So, a TaskController that is launching tasks
+    * as different users can implement this method to setup
+    * appropriate ownership and permissions for these directories
+    * and files.
+    */
+  abstract void initializeTask(TaskControllerContext context)
+      throws IOException;
+
   /**
    * Contains task information required for the task controller.  
    */
   static class TaskControllerContext {
     // task being executed
-    Task task; 
-    // the JVM environment for the task
-    JvmEnv env;
-    // the Shell executor executing the JVM for this task
-    ShellCommandExecutor shExec; 
-    // process handle of task JVM
-    String pid;
-    // waiting time before sending SIGKILL to task JVM after sending SIGTERM
-    long sleeptimeBeforeSigkill;
+    Task task;
+    ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
+
+    // Information used only when this context is used for launching new tasks.
+    JvmEnv env;     // the JVM environment for the task.
+
+    // Information used only when this context is used for destroying a task jvm.
+    String pid; // process handle of task JVM.
+    long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+  }
+
+  static class JobInitializationContext {
+    JobID jobid;
+    File workDir;
+    String user;
   }
 
-  /**
-   * Method which is called after the job is localized so that task controllers
-   * can implement their own job localization logic.
-   * 
-   * @param tip  Task of job for which localization happens.
-   */
-  abstract void initializeJob(JobID jobId);
-  
   /**
    * Sends a graceful terminate signal to taskJVM and it sub-processes. 
    *   
@@ -144,6 +213,5 @@ abstract class TaskController implements
    * 
    * @param context task context
    */
-  
   abstract void killTask(TaskControllerContext context);
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 03:42:01 2011
@@ -57,9 +57,10 @@ public class TaskLog {
   private static final Log LOG =
     LogFactory.getLog(TaskLog.class);
 
+  static final String USERLOGS_DIR_NAME = "userlogs";
+
   private static final File LOG_DIR = 
-    new File(System.getProperty("hadoop.log.dir"), 
-             "userlogs").getAbsoluteFile();
+    new File(getBaseLogDir(), USERLOGS_DIR_NAME).getAbsoluteFile();
   
   // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
@@ -178,7 +179,11 @@ public class TaskLog {
       return new File(getBaseDir(taskid), "log.index");
     }
   }
-  
+
+  static String getBaseLogDir() {
+    return System.getProperty("hadoop.log.dir");
+  }
+
   static File getBaseDir(String taskid) {
     return new File(LOG_DIR, taskid);
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077111&r1=1077110&r2=1077111&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 03:42:01 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.TaskTracker.PermissionsHandler;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -78,7 +79,7 @@ abstract class TaskRunner extends Thread
     this.t = tip.getTask();
     this.tracker = tracker;
     this.conf = conf;
-    this.mapOutputFile = new MapOutputFile(t.getJobID());
+    this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
     this.jvmManager = tracker.getJvmManagerInstance();
   }
@@ -125,223 +126,41 @@ abstract class TaskRunner extends Thread
       
       URI[] archives = DistributedCache.getCacheArchives(conf);
       URI[] files = DistributedCache.getCacheFiles(conf);
+      // We don't create any symlinks yet, so presence/absence of workDir
+      // actually on the file system doesn't matter.
       setupDistributedCache(lDirAlloc, workDir, archives, files);
-          
+      
+      // Set up the child task's configuration. After this call, no localization
+      // of files should happen in the TaskTracker's process space. Any changes to
+      // the conf object after this will NOT be reflected to the child.
+      setupChildTaskConfiguration(lDirAlloc);
+      
       if (!prepare()) {
         return;
       }
-
-      // Accumulates class paths for child.
-      List<String> classPaths = new ArrayList<String>();
-      // start with same classpath as parent process
-      appendSystemClasspaths(classPaths);
-
-      if (!workDir.mkdirs()) {
-        if (!workDir.isDirectory()) {
-          LOG.fatal("Mkdirs failed to create " + workDir.toString());
-        }
-      }
-
-      // include the user specified classpath
-      appendJobJarClasspaths(conf.getJar(), classPaths);
-  		
-      // Distributed cache paths
-      appendDistributedCacheClasspaths(conf, archives, files, classPaths);
-      
-      // Include the working dir too
-      classPaths.add(workDir.toString());
       
       // Build classpath
+      List<String> classPaths = getClassPaths(conf, workDir, archives, files);
       
+      long logSize = TaskLog.getTaskLogLength(conf);
       
       //  Build exec child JVM args.
-      Vector<String> vargs = new Vector<String>(8);
-      File jvm =                                  // use same jvm as parent
-        new File(new File(System.getProperty("java.home"), "bin"), "java");
-
-      vargs.add(jvm.toString());
-
-      // Add child (task) java-vm options.
-      //
-      // The following symbols if present in mapred.child.java.opts value are
-      // replaced:
-      // + @taskid@ is interpolated with value of TaskID.
-      // Other occurrences of @ will not be altered.
-      //
-      // Example with multiple arguments and substitutions, showing
-      // jvm GC logging, and start of a passwordless JVM JMX agent so can
-      // connect with jconsole and the likes to watch child memory, threads
-      // and get thread dumps.
-      //
-      //  <property>
-      //    <name>mapred.child.java.opts</name>
-      //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
-      //           -Dcom.sun.management.jmxremote.authenticate=false \
-      //           -Dcom.sun.management.jmxremote.ssl=false \
-      //    </value>
-      //  </property>
-      //
-      String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
-      javaOpts = javaOpts.replace("@taskid@", taskid.toString());
-      String [] javaOptsSplit = javaOpts.split(" ");
-      
-      // Add java.library.path; necessary for loading native libraries.
-      //
-      // 1. To support native-hadoop library i.e. libhadoop.so, we add the 
-      //    parent processes' java.library.path to the child. 
-      // 2. We also add the 'cwd' of the task to it's java.library.path to help 
-      //    users distribute native libraries via the DistributedCache.
-      // 3. The user can also specify extra paths to be added to the 
-      //    java.library.path via mapred.child.java.opts.
-      //
-      String libraryPath = System.getProperty("java.library.path");
-      if (libraryPath == null) {
-        libraryPath = workDir.getAbsolutePath();
-      } else {
-        libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
-      }
-      boolean hasUserLDPath = false;
-      for(int i=0; i<javaOptsSplit.length ;i++) { 
-        if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
-          javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
-          hasUserLDPath = true;
-          break;
-        }
-      }
-      if(!hasUserLDPath) {
-        vargs.add("-Djava.library.path=" + libraryPath);
-      }
-      for (int i = 0; i < javaOptsSplit.length; i++) {
-        vargs.add(javaOptsSplit[i]);
-      }
-
-      // add java.io.tmpdir given by mapred.child.tmp
-      String tmp = conf.get("mapred.child.tmp", "./tmp");
-      Path tmpDir = new Path(tmp);
-      
-      // if temp directory path is not absolute 
-      // prepend it with workDir.
-      if (!tmpDir.isAbsolute()) {
-        tmpDir = new Path(workDir.toString(), tmp);
-      }
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
-      }
-      vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());
-
-      // Add classpath.
-      vargs.add("-classpath");
-      String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
-      vargs.add(classPath);
-
-      // Setup the log4j prop
-      long logSize = TaskLog.getTaskLogLength(conf);
-      vargs.add("-Dhadoop.log.dir=" + 
-          new File(System.getProperty("hadoop.log.dir")
-          ).getAbsolutePath());
-      vargs.add("-Dhadoop.root.logger=INFO,TLA");
-      vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
-      vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
-
-      if (conf.getProfileEnabled()) {
-        if (conf.getProfileTaskRange(t.isMapTask()
-                                     ).isIncluded(t.getPartition())) {
-          File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
-          vargs.add(String.format(conf.getProfileParams(), prof.toString()));
-        }
-      }
-
-      // Add main class and its arguments 
-      vargs.add(Child.class.getName());  // main of Child
-      // pass umbilical address
-      InetSocketAddress address = tracker.getTaskTrackerReportAddress();
-      vargs.add(address.getAddress().getHostAddress()); 
-      vargs.add(Integer.toString(address.getPort())); 
-      vargs.add(taskid.toString());                      // pass task identifier
+      Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
       
       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
-      String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
-      List<String> setup = null;
-      if (ulimitCmd != null) {
-        setup = new ArrayList<String>();
-        for (String arg : ulimitCmd) {
-          setup.add(arg);
-        }
-      }
-
+      List<String> setup = getVMSetupCmd();
       // Set up the redirection of the task's stdout and stderr streams
-      File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
-      File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
-      stdout.getParentFile().mkdirs();
-      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
-
+      File[] logFiles = prepareLogFiles(taskid);
+      File stdout = logFiles[0];
+      File stderr = logFiles[1];
+      tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
+                 stderr);
+      
       Map<String, String> env = new HashMap<String, String>();
-      StringBuffer ldLibraryPath = new StringBuffer();
-      ldLibraryPath.append(workDir.toString());
-      String oldLdLibraryPath = null;
-      oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
-      if (oldLdLibraryPath != null) {
-        ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
-        ldLibraryPath.append(oldLdLibraryPath);
-      }
-      env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-
-      String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
-      LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
-      env.put("JOB_TOKEN_FILE", jobTokenFile);
-
-      // for the child of task jvm, set hadoop.root.logger
-      env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
-      String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
-      if (hadoopClientOpts == null) {
-        hadoopClientOpts = "";
-      } else {
-        hadoopClientOpts = hadoopClientOpts + " ";
-      }
-      hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
-                         + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
-      env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
-      
-      // add the env variables passed by the user
-      String mapredChildEnv = conf.get("mapred.child.env");
-      if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
-        String childEnvs[] = mapredChildEnv.split(",");
-        for (String cEnv : childEnvs) {
-          try {
-            String[] parts = cEnv.split("="); // split on '='
-            String value = env.get(parts[0]);
-            if (value != null) {
-              // replace $env with the child's env constructed by tt's
-              // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
-              value = parts[1].replace("$" + parts[0], value);
-            } else {
-              // this key is not configured by the tt for the child .. get it 
-              // from the tt's env
-              // example PATH=$PATH:/tmp
-              value = System.getenv(parts[0]);
-              if (value != null) {
-                // the env key is present in the tt's env
-                value = parts[1].replace("$" + parts[0], value);
-              } else {
-                // the env key is note present anywhere .. simply set it
-                // example X=$X:/tmp or X=/tmp
-                value = parts[1].replace("$" + parts[0], "");
-              }
-            }
-            env.put(parts[0], value);
-          } catch (Throwable t) {
-            // set the error msg
-            errorInfo = "Invalid User environment settings : " + mapredChildEnv 
-                        + ". Failed to parse user-passed environment param."
-                        + " Expecting : env1=value1,env2=value2...";
-            LOG.warn(errorInfo);
-            throw t;
-          }
-        }
-      }
+      errorInfo = getVMEnvironment(errorInfo, workDir, conf, env, taskid,
+                                   logSize);
 
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
@@ -369,7 +188,7 @@ abstract class TaskRunner extends Thread
         LOG.fatal(t.getTaskID()+" reporting FSError", ie);
       }
     } catch (Throwable throwable) {
-      LOG.warn(t.getTaskID() + errorInfo, throwable);
+      LOG.warn(t.getTaskID() + " : " + errorInfo, throwable);
       Throwable causeThrowable = new Throwable(errorInfo, throwable);
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       causeThrowable.printStackTrace(new PrintStream(baos));
@@ -404,15 +223,343 @@ abstract class TaskRunner extends Thread
     }
   }
 
+  /**
+   * Prepare the log files for the task
+   * 
+   * @param taskid
+   * @return an array of files. The first file is stdout, the second is stderr.
+   */
+  static File[] prepareLogFiles(TaskAttemptID taskid) {
+    File[] logFiles = new File[2];
+    logFiles[0] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
+    logFiles[1] = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
+    File logDir = logFiles[0].getParentFile();
+    boolean b = logDir.mkdirs();
+    if (!b) {
+      LOG.warn("mkdirs failed. Ignoring");
+    } else {
+      PermissionsHandler.setPermissions(logDir,
+          PermissionsHandler.sevenZeroZero);
+    }
+    return logFiles;
+  }
+
+  /**
+   * Write the child's configuration to the disk and set it in configuration so
+   * that the child can pick it up from there.
+   * 
+   * @param lDirAlloc
+   * @throws IOException
+   */
+  void setupChildTaskConfiguration(LocalDirAllocator lDirAlloc)
+      throws IOException {
+
+    Path localTaskFile =
+        lDirAlloc.getLocalPathForWrite(TaskTracker.getTaskConfFile(t
+            .getJobID().toString(), t.getTaskID().toString(), t
+            .isTaskCleanupTask()), conf);
+
+    // write the child's task configuration file to the local disk
+    writeLocalTaskFile(localTaskFile.toString(), conf);
+
+    // Set the final job file in the task. The child needs to know the correct
+    // path to job.xml. So set this path accordingly.
+    t.setJobFile(localTaskFile.toString());
+  }
+
+  /**
+   * @return
+   */
+  private List<String> getVMSetupCmd() {
+    String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
+    List<String> setup = null;
+    if (ulimitCmd != null) {
+      setup = new ArrayList<String>();
+      for (String arg : ulimitCmd) {
+        setup.add(arg);
+      }
+    }
+    return setup;
+  }
+
+  /**
+   * @param taskid
+   * @param workDir
+   * @param classPaths
+   * @param logSize
+   * @return
+   * @throws IOException
+   */
+  private Vector<String> getVMArgs(TaskAttemptID taskid, File workDir,
+      List<String> classPaths, long logSize)
+      throws IOException {
+    Vector<String> vargs = new Vector<String>(8);
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+
+    vargs.add(jvm.toString());
+
+    // Add child (task) java-vm options.
+    //
+    // The following symbols if present in mapred.child.java.opts value are
+    // replaced:
+    // + @taskid@ is interpolated with value of TaskID.
+    // Other occurrences of @ will not be altered.
+    //
+    // Example with multiple arguments and substitutions, showing
+    // jvm GC logging, and start of a passwordless JVM JMX agent so can
+    // connect with jconsole and the likes to watch child memory, threads
+    // and get thread dumps.
+    //
+    //  <property>
+    //    <name>mapred.child.java.opts</name>
+    //    <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
+    //           -Dcom.sun.management.jmxremote.authenticate=false \
+    //           -Dcom.sun.management.jmxremote.ssl=false \
+    //    </value>
+    //  </property>
+    //
+    String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
+    javaOpts = javaOpts.replace("@taskid@", taskid.toString());
+    String [] javaOptsSplit = javaOpts.split(" ");
+    
+    // Add java.library.path; necessary for loading native libraries.
+    //
+    // 1. To support native-hadoop library i.e. libhadoop.so, we add the 
+    //    parent processes' java.library.path to the child. 
+    // 2. We also add the 'cwd' of the task to it's java.library.path to help 
+    //    users distribute native libraries via the DistributedCache.
+    // 3. The user can also specify extra paths to be added to the 
+    //    java.library.path via mapred.child.java.opts.
+    //
+    String libraryPath = System.getProperty("java.library.path");
+    if (libraryPath == null) {
+      libraryPath = workDir.getAbsolutePath();
+    } else {
+      libraryPath += SYSTEM_PATH_SEPARATOR + workDir;
+    }
+    boolean hasUserLDPath = false;
+    for(int i=0; i<javaOptsSplit.length ;i++) { 
+      if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
+        javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
+        hasUserLDPath = true;
+        break;
+      }
+    }
+    if(!hasUserLDPath) {
+      vargs.add("-Djava.library.path=" + libraryPath);
+    }
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+
+    Path childTmpDir = createChildTmpDir(workDir, conf);
+    vargs.add("-Djava.io.tmpdir=" + childTmpDir);
+
+    // Add classpath.
+    vargs.add("-classpath");
+    String classPath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
+    vargs.add(classPath);
+
+    // Setup the log4j prop
+    vargs.add("-Dhadoop.log.dir=" + 
+        new File(System.getProperty("hadoop.log.dir")
+        ).getAbsolutePath());
+    vargs.add("-Dhadoop.root.logger=INFO,TLA");
+    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
+    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);
+
+    if (conf.getProfileEnabled()) {
+      if (conf.getProfileTaskRange(t.isMapTask()
+                                   ).isIncluded(t.getPartition())) {
+        File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
+        vargs.add(String.format(conf.getProfileParams(), prof.toString()));
+      }
+    }
+
+    // Add main class and its arguments 
+    vargs.add(Child.class.getName());  // main of Child
+    // pass umbilical address
+    InetSocketAddress address = tracker.getTaskTrackerReportAddress();
+    vargs.add(address.getAddress().getHostAddress()); 
+    vargs.add(Integer.toString(address.getPort())); 
+    vargs.add(taskid.toString());                      // pass task identifier
+    return vargs;
+  }
+
+  /**
+   * @param taskid
+   * @param workDir
+   * @return
+   * @throws IOException
+   */
+  static Path createChildTmpDir(File workDir,
+      JobConf conf)
+      throws IOException {
+
+    // add java.io.tmpdir given by mapred.child.tmp
+    String tmp = conf.get("mapred.child.tmp", "./tmp");
+    Path tmpDir = new Path(tmp);
+
+    // if temp directory path is not absolute, prepend it with workDir.
+    if (!tmpDir.isAbsolute()) {
+      tmpDir = new Path(workDir.toString(), tmp);
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
+        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      }
+    }
+    return tmpDir;
+  }
+
+  /**
+   */
+  private static List<String> getClassPaths(JobConf conf, File workDir,
+      URI[] archives, URI[] files)
+      throws IOException {
+    // Accumulates class paths for child.
+    List<String> classPaths = new ArrayList<String>();
+    // start with same classpath as parent process
+    appendSystemClasspaths(classPaths);
+
+    // include the user specified classpath
+    appendJobJarClasspaths(conf.getJar(), classPaths);
+    
+    // Distributed cache paths
+    appendDistributedCacheClasspaths(conf, archives, files, classPaths);
+    
+    // Include the working dir too
+    classPaths.add(workDir.toString());
+    return classPaths;
+  }
+
+  /**
+   * @param errorInfo
+   * @param workDir
+   * @param env
+   * @return
+   * @throws Throwable
+   */
+  private static String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
+      Map<String, String> env, TaskAttemptID taskid, long logSize)
+      throws Throwable {
+    StringBuffer ldLibraryPath = new StringBuffer();
+    ldLibraryPath.append(workDir.toString());
+    String oldLdLibraryPath = null;
+    oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
+    if (oldLdLibraryPath != null) {
+      ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
+      ldLibraryPath.append(oldLdLibraryPath);
+    }
+    env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+
+    String jobTokenFile = conf.get(JobContext.JOB_TOKEN_FILE);
+    LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
+    env.put("JOB_TOKEN_FILE", jobTokenFile);
+    
+    // for the child of task jvm, set hadoop.root.logger
+    env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
+    String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
+    if (hadoopClientOpts == null) {
+      hadoopClientOpts = "";
+    } else {
+      hadoopClientOpts = hadoopClientOpts + " ";
+    }
+    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
+                       + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+    env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\"");
+
+    // add the env variables passed by the user
+    String mapredChildEnv = conf.get("mapred.child.env");
+    if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
+      String childEnvs[] = mapredChildEnv.split(",");
+      for (String cEnv : childEnvs) {
+        try {
+          String[] parts = cEnv.split("="); // split on '='
+          String value = env.get(parts[0]);
+          if (value != null) {
+            // replace $env with the child's env constructed by tt's
+            // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
+            value = parts[1].replace("$" + parts[0], value);
+          } else {
+            // this key is not configured by the tt for the child .. get it 
+            // from the tt's env
+            // example PATH=$PATH:/tmp
+            value = System.getenv(parts[0]);
+            if (value != null) {
+              // the env key is present in the tt's env
+              value = parts[1].replace("$" + parts[0], value);
+            } else {
+              // the env key is note present anywhere .. simply set it
+              // example X=$X:/tmp or X=/tmp
+              value = parts[1].replace("$" + parts[0], "");
+            }
+          }
+          env.put(parts[0], value);
+        } catch (Throwable t) {
+          // set the error msg
+          errorInfo = "Invalid User environment settings : " + mapredChildEnv 
+                      + ". Failed to parse user-passed environment param."
+                      + " Expecting : env1=value1,env2=value2...";
+          LOG.warn(errorInfo);
+          throw t;
+        }
+      }
+    }
+    return errorInfo;
+  }
+
+  /**
+   * Write the task specific job-configuration file.
+   * 
+   * @param localFs
+   * @throws IOException
+   */
+  private static void writeLocalTaskFile(String jobFile, JobConf conf)
+      throws IOException {
+    Path localTaskFile = new Path(jobFile);
+    FileSystem localFs = FileSystem.getLocal(conf);
+    localFs.delete(localTaskFile, true);
+    OutputStream out = localFs.create(localTaskFile);
+    try {
+      conf.writeXml(out);
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
+   * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
+   * Whenever it uses LocalDirAllocator from now on inside the child, it will
+   * only see files inside the attempt-directory. This is done in the Child's
+   * process space.
+   */
+  static void setupChildMapredLocalDirs(Task t, JobConf conf) {
+    String[] localDirs = conf.getStrings("mapred.local.dir");
+    String jobId = t.getJobID().toString();
+    String taskId = t.getTaskID().toString();
+    boolean isCleanup = t.isTaskCleanupTask();
+    StringBuffer childMapredLocalDir =
+        new StringBuffer(localDirs[0] + Path.SEPARATOR
+            + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+    for (int i = 1; i < localDirs.length; i++) {
+      childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+          + TaskTracker.getLocalTaskDir(jobId, taskId, isCleanup));
+    }
+    LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
+    conf.set("mapred.local.dir", childMapredLocalDir.toString());
+  }
+
   /** Creates the working directory pathname for a task attempt. */ 
   static File formWorkDir(LocalDirAllocator lDirAlloc, 
       TaskAttemptID task, boolean isCleanup, JobConf conf) 
       throws IOException {
-    File workDir = new File(lDirAlloc.getLocalPathToRead(
-        TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
-          task.toString(), isCleanup) 
-        + Path.SEPARATOR + MRConstants.WORKDIR, conf).toString());
-    return workDir;
+    Path workDir =
+        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
+            .getJobID().toString(), task.toString(), isCleanup), conf);
+
+    return new File(workDir.toString());
   }
 
   private void setupDistributedCache(LocalDirAllocator lDirAlloc, File workDir,
@@ -431,7 +578,7 @@ abstract class TaskRunner extends Thread
           fileStatus = fileSystem.getFileStatus(
                                     new Path(archives[i].getPath()));
           String cacheId = DistributedCache.makeRelative(archives[i],conf);
-          String cachePath = TaskTracker.getCacheSubdir() + 
+          String cachePath = TaskTracker.getDistributedCacheDir() + 
                                Path.SEPARATOR + cacheId;
           
           localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -457,7 +604,7 @@ abstract class TaskRunner extends Thread
           fileStatus = fileSystem.getFileStatus(
                                     new Path(files[i].getPath()));
           String cacheId = DistributedCache.makeRelative(files[i], conf);
-          String cachePath = TaskTracker.getCacheSubdir() +
+          String cachePath = TaskTracker.getDistributedCacheDir() +
                                Path.SEPARATOR + cacheId;
           
           localPath = lDirAlloc.getLocalPathForWrite(cachePath,
@@ -474,20 +621,12 @@ abstract class TaskRunner extends Thread
         }
         DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
       }
-      Path localTaskFile = new Path(t.getJobFile());
-      FileSystem localFs = FileSystem.getLocal(conf);
-      localFs.delete(localTaskFile, true);
-      OutputStream out = localFs.create(localTaskFile);
-      try {
-        conf.writeXml(out);
-      } finally {
-        out.close();
-      }
     }
   }
 
-  private void appendDistributedCacheClasspaths(JobConf conf, URI[] archives, 
-      URI[] files, List<String> classPaths) throws IOException {
+  private static void appendDistributedCacheClasspaths(JobConf conf,
+      URI[] archives, URI[] files, List<String> classPaths)
+      throws IOException {
     // Archive paths
     Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
     if (archiveClasspaths != null && archives != null) {
@@ -522,8 +661,9 @@ abstract class TaskRunner extends Thread
     }
   }
 
-  private void appendSystemClasspaths(List<String> classPaths) {
-    for (String c : System.getProperty("java.class.path").split(SYSTEM_PATH_SEPARATOR)) {
+  private static void appendSystemClasspaths(List<String> classPaths) {
+    for (String c : System.getProperty("java.class.path").split(
+        SYSTEM_PATH_SEPARATOR)) {
       classPaths.add(c);
     }
   }
@@ -605,19 +745,8 @@ abstract class TaskRunner extends Thread
       // Do not exit even if symlinks have not been created.
       LOG.warn(StringUtils.stringifyException(ie));
     }
-    // add java.io.tmpdir given by mapred.child.tmp
-    String tmp = conf.get("mapred.child.tmp", "./tmp");
-    Path tmpDir = new Path(tmp);
 
-    // if temp directory path is not absolute
-    // prepend it with workDir.
-    if (!tmpDir.isAbsolute()) {
-      tmpDir = new Path(workDir.toString(), tmp);
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
-      }
-    }
+    createChildTmpDir(workDir, conf);
   }
 
   /**