You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:43:35 UTC

svn commit: r1077679 [4/6] - in /hadoop/common/branches/branch-0.20-security-patches: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/core/org/apache/hadoop/fs/ src/core/org/apa...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Fri Mar  4 04:43:33 2011
@@ -17,27 +17,23 @@
 */
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link TaskController} that runs the task JVMs as the user 
@@ -62,22 +58,22 @@ class LinuxTaskController extends TaskCo
 
   private static final Log LOG = 
             LogFactory.getLog(LinuxTaskController.class);
-
-  // Name of the executable script that will contain the child
-  // JVM command line. See writeCommand for details.
-  private static final String COMMAND_FILE = "taskjvm.sh";
   
   // Path to the setuid executable.
-  private static String taskControllerExe;
+  private String taskControllerExe;
+  private static final String TASK_CONTROLLER_EXEC_KEY =
+    "mapreduce.tasktracker.task-controller.exe";
   
-  static {
-    // the task-controller is expected to be under the $HADOOP_HOME/bin
-    // directory.
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
     File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
-    taskControllerExe = 
+    String defaultTaskController = 
         new File(hadoopBin, "task-controller").getAbsolutePath();
+    taskControllerExe = conf.get(TASK_CONTROLLER_EXEC_KEY, 
+                                 defaultTaskController);       
   }
-  
+
   public LinuxTaskController() {
     super();
   }
@@ -85,27 +81,49 @@ class LinuxTaskController extends TaskCo
   /**
    * List of commands that the setuid script will execute.
    */
-  enum TaskControllerCommands {
-    INITIALIZE_USER,
-    INITIALIZE_JOB,
-    INITIALIZE_DISTRIBUTEDCACHE_FILE,
-    LAUNCH_TASK_JVM,
-    INITIALIZE_TASK,
-    TERMINATE_TASK_JVM,
-    KILL_TASK_JVM,
-    ENABLE_TASK_FOR_CLEANUP,
-    ENABLE_JOB_FOR_CLEANUP
+  enum Commands {
+    INITIALIZE_JOB(0),
+    LAUNCH_TASK_JVM(1),
+    SIGNAL_TASK(2),
+    DELETE_AS_USER(3),
+    DELETE_LOG_AS_USER(4);
+
+    private int value;
+    Commands(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
+  }
+
+  /**
+   * Result codes returned from the C task-controller.
+   * These must match the values in task-controller.h.
+   */
+  enum ResultCode {
+    OK(0),
+    INVALID_USER_NAME(2),
+    INVALID_TASK_PID(9),
+    INVALID_CONFIG_FILE(24);
+
+    private final int value;
+    ResultCode(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
   }
 
   @Override
-  public void setup() throws IOException {
-    super.setup();
+  public void setup(LocalDirAllocator allocator) throws IOException {
 
     // Check the permissions of the task-controller binary by running it plainly.
     // If permissions are correct, it returns an error code 1, else it returns
     // 24 or something else if some other bugs are also present.
     String[] taskControllerCmd =
-        new String[] { getTaskControllerExecutablePath() };
+        new String[] { taskControllerExe };
     ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
     try {
       shExec.execute();
@@ -118,50 +136,97 @@ class LinuxTaskController extends TaskCo
           + "permissions/ownership with exit code " + exitCode, e);
       }
     }
+    this.allocator = allocator;
   }
+  
 
-  /**
-   * Launch a task JVM that will run as the owner of the job.
-   * 
-   * This method launches a task JVM by executing a setuid executable that will
-   * switch to the user and run the task. Also does initialization of the first
-   * task in the same setuid process launch.
-   */
   @Override
-  void launchTaskJVM(TaskController.TaskControllerContext context) 
-                                        throws IOException {
-    JvmEnv env = context.env;
-    // get the JVM command line.
-    String cmdLine = 
-      TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true);
-
-    StringBuffer sb = new StringBuffer();
-    //export out all the environment variable before child command as
-    //the setuid/setgid binaries would not be getting, any environmental
-    //variables which begin with LD_*.
-    for(Entry<String, String> entry : env.env.entrySet()) {
-      sb.append("export ");
-      sb.append(entry.getKey());
-      sb.append("=");
-      sb.append(entry.getValue());
-      sb.append("\n");
+  public void initializeJob(String user, String jobid, Path credentials,
+                            Path jobConf, TaskUmbilicalProtocol taskTracker
+                            ) throws IOException {
+    List<String> command = new ArrayList<String>(
+      Arrays.asList(taskControllerExe, 
+                    user, 
+                    Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+                    jobid,
+                    credentials.toUri().getPath().toString(),
+                    jobConf.toUri().getPath().toString()));
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    command.add(jvm.toString());
+    command.add("-classpath");
+    command.add(System.getProperty("java.class.path"));
+    command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+    command.add("-Dhadoop.root.logger=INFO,console");
+    command.add(JobLocalizer.class.getName());  // main of JobLocalizer
+    command.add(user);
+    command.add(jobid);
+    // add the task tracker's reporting address
+    InetSocketAddress ttAddr = 
+      ((TaskTracker) taskTracker).getTaskTrackerReportAddress();
+    command.add(ttAddr.getHostName());
+    command.add(Integer.toString(ttAddr.getPort()));
+    String[] commandArray = command.toArray(new String[0]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("initializeJob: " + Arrays.toString(commandArray));
     }
-    sb.append(cmdLine);
-    // write the command to a file in the
-    // task specific cache directory
-    writeCommand(sb.toString(), getTaskCacheDirectory(context));
-    
-    // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
-    ShellCommandExecutor shExec =  buildTaskControllerExecutor(
-                                    TaskControllerCommands.LAUNCH_TASK_JVM, 
-                                    env.conf.getUser(),
-                                    launchTaskJVMArgs, env.workDir, env.env);
-    context.shExec = shExec;
     try {
       shExec.execute();
+      if (LOG.isDebugEnabled()) {
+        logOutput(shExec.getOutput());
+      }
+    } catch (ExitCodeException e) {
+      int exitCode = shExec.getExitCode();
+      logOutput(shExec.getOutput());
+      throw new IOException("Job initialization failed (" + exitCode + ")", e);
+    }
+  }
+
+  @Override
+  public int launchTask(String user, 
+                                  String jobId,
+                                  String attemptId,
+                                  List<String> setup,
+                                  List<String> jvmArguments,
+                                  File currentWorkDirectory,
+                                  String stdout,
+                                  String stderr) throws IOException {
+
+    ShellCommandExecutor shExec = null;
+    try {
+      FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+      long logSize = 0; //TODO, Ref BUG:2854624
+      // get the JVM command line.
+      String cmdLine = 
+        TaskLog.buildCommandLine(setup, jvmArguments,
+            new File(stdout), new File(stderr), logSize, true);
+
+      // write the command to a file in the
+      // task specific cache directory
+      Path p = new Path(allocator.getLocalPathForWrite(
+          TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+          getConf()), COMMAND_FILE);
+      String commandFile = writeCommand(cmdLine, rawFs, p); 
+
+      String[] command = 
+        new String[]{taskControllerExe, 
+          user,
+          Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
+          jobId,
+          attemptId,
+          currentWorkDirectory.toString(),
+          commandFile};
+      shExec = new ShellCommandExecutor(command);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("launchTask: " + Arrays.toString(command));
+      }
+      shExec.execute();
     } catch (Exception e) {
+      if (shExec == null) {
+        return -1;
+      }
       int exitCode = shExec.getExitCode();
       LOG.warn("Exit code from task is : " + exitCode);
       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
@@ -173,445 +238,56 @@ class LinuxTaskController extends TaskCo
         LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
         logOutput(shExec.getOutput());
       }
-      throw new IOException(e);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
-      logOutput(shExec.getOutput());
-    }
-  }
-
-  /**
-   * Helper method that runs a LinuxTaskController command
-   * 
-   * @param taskControllerCommand
-   * @param user
-   * @param cmdArgs
-   * @param env
-   * @throws IOException
-   */
-  private void runCommand(TaskControllerCommands taskControllerCommand, 
-      String user, List<String> cmdArgs, File workDir, Map<String, String> env)
-      throws IOException {
-
-    ShellCommandExecutor shExec =
-        buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, 
-                                    workDir, env);
-    try {
-      shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
-          + shExec.getExitCode());
-      LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
-          + StringUtils.stringifyException(e));
-      LOG.info("Output from LinuxTaskController's " 
-               + taskControllerCommand.toString() + " follows:");
-      logOutput(shExec.getOutput());
-      throw new IOException(e);
+      return exitCode;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's " 
-               + taskControllerCommand.toString() + " follows:");
+      LOG.debug("Output from LinuxTaskController's launchTask follows:");
       logOutput(shExec.getOutput());
     }
-  }
-
-  /**
-   * Returns list of arguments to be passed while initializing a new task. See
-   * {@code buildTaskControllerExecutor(TaskControllerCommands, String, 
-   * List<String>, JvmEnv)} documentation.
-   * 
-   * @param context
-   * @return Argument to be used while launching Task VM
-   */
-  private List<String> buildInitializeTaskArgs(TaskControllerContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    String taskId = context.task.getTaskID().toString();
-    String jobId = getJobId(context);
-    commandArgs.add(jobId);
-    if (!context.task.isTaskCleanupTask()) {
-      commandArgs.add(taskId);
-    } else {
-      commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
-    }
-    return commandArgs;
+    return 0;
   }
 
   @Override
-  void initializeTask(TaskControllerContext context)
-      throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " 
-                + TaskControllerCommands.INITIALIZE_TASK.toString()
-                + " for " + context.task.getTaskID().toString());
-    }
-    runCommand(TaskControllerCommands.INITIALIZE_TASK, 
-        context.env.conf.getUser(),
-        buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
+  public void deleteAsUser(String user, String subDir) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.DELETE_AS_USER.getValue()),
+                   subDir};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
+    shExec.execute();
   }
 
-  /**
-   * Builds the args to be passed to task-controller for enabling of task for
-   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
-   */
-  private List<String> buildTaskCleanupArgs(
-      TaskControllerTaskPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.task.getJobID().toString());
-
-    String workDir = "";
-    if (context.isWorkDir) {
-      workDir = "/work";
-    }
-    if (context.task.isTaskCleanupTask()) {
-      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
-                      + workDir);
-    } else {
-      commandArgs.add(context.task.getTaskID() + workDir);
-    }
-
-    return commandArgs;
-  }
-
-  /**
-   * Builds the args to be passed to task-controller for enabling of job for
-   * cleanup. Last arg in this List is $jobid.
-   */
-  private List<String> buildJobCleanupArgs(
-      TaskControllerJobPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(2);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.jobId.toString());
-
-    return commandArgs;
-  }
-  
-  /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
   @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (context instanceof TaskControllerTaskPathDeletionContext) {
-      TaskControllerTaskPathDeletionContext tContext =
-        (TaskControllerTaskPathDeletionContext) context;
-      enablePathForCleanup(tContext, 
-                           TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
-                           buildTaskCleanupArgs(tContext));
-    }
-    else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-          + "TaskControllerTaskPathDeletionContext.");
-    }
+  public void deleteLogAsUser(String user, String subDir) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
+                   subDir};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
+    shExec.execute();
   }
 
-  /**
-   * Enables the job for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
   @Override
-  void enableJobForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (context instanceof TaskControllerJobPathDeletionContext) {
-      TaskControllerJobPathDeletionContext tContext =
-        (TaskControllerJobPathDeletionContext) context;
-      enablePathForCleanup(tContext, 
-                           TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
-                           buildJobCleanupArgs(tContext));
-    } else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-                  + "TaskControllerJobPathDeletionContext.");
-    }
-  }
-  
-  /**
-   * Enable a path for cleanup
-   * @param c {@link TaskControllerPathDeletionContext} for the path to be 
-   *          cleaned up
-   * @param command {@link TaskControllerCommands} for task/job cleanup
-   * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable 
-   *                    path cleanup
-   */
-  private void enablePathForCleanup(TaskControllerPathDeletionContext c,
-                                    TaskControllerCommands command,
-                                    List<String> cleanupArgs) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
-    }
-
-    if ( c.user != null && c.fs instanceof LocalFileSystem) {
-      try {
-        runCommand(command, c.user, cleanupArgs, null, null);
-      } catch(IOException e) {
-        LOG.warn("Unable to change permissions for " + c.fullPath);
-      }
-    }
-    else {
-      throw new IllegalArgumentException("Either user is null or the " 
-                  + "file system is not local file system.");
-    }
-  }
-
-  private void logOutput(String output) {
-    String shExecOutput = output;
-    if (shExecOutput != null) {
-      for (String str : shExecOutput.split("\n")) {
-        LOG.info(str);
-      }
-    }
-  }
-
-  private String getJobId(TaskControllerContext context) {
-    String taskId = context.task.getTaskID().toString();
-    TaskAttemptID tId = TaskAttemptID.forName(taskId);
-    String jobId = tId.getJobID().toString();
-    return jobId;
-  }
-
-  /**
-   * Returns list of arguments to be passed while launching task VM.
-   * See {@code buildTaskControllerExecutor(TaskControllerCommands, 
-   * String, List<String>, JvmEnv)} documentation.
-   * @param context
-   * @return Argument to be used while launching Task VM
-   */
-  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context));
-    LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
-        context) );
-    commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context)), 
-        context));
-    commandArgs.addAll(buildInitializeTaskArgs(context));
-    return commandArgs;
-  }
-
-  // Get the directory from the list of directories configured
-  // in mapred.local.dir chosen for storing data pertaining to
-  // this task.
-  private String getDirectoryChosenForTask(File directory,
-      TaskControllerContext context) {
-    String jobId = getJobId(context);
-    String taskId = context.task.getTaskID().toString();
-    for (String dir : mapredLocalDirs) {
-      File mapredDir = new File(dir);
-      File taskDir =
-          new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
-              .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
-              .getParentFile();
-      if (directory.equals(taskDir)) {
-        return dir;
-      }
-    }
-    
-    LOG.error("Couldn't parse task cache directory correctly");
-    throw new IllegalArgumentException("invalid task cache directory "
-                + directory.getAbsolutePath());
-  }
-
-  @Override
-  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to initialize distributed cache for " + context.user
-          + " with localizedBaseDir " + context.localizedBaseDir + 
-          " and uniqueString " + context.uniqueString);
-    }
-    List<String> args = new ArrayList<String>();
-    // Here, uniqueString might start with '-'. Adding -- in front of the 
-    // arguments indicates that they are non-option parameters.
-    args.add("--");
-    args.add(context.localizedBaseDir.toString());
-    args.add(context.uniqueString);
-    runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, 
-        context.user, args, context.workDir, null);
-  }
-
-  @Override
-  public void initializeUser(InitializationContext context)
-      throws IOException {
-    LOG.debug("Going to initialize user directories for " + context.user
-        + " on the TT");
-    runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
-        new ArrayList<String>(), context.workDir, null);
-  }
-
-  /**
-   * Builds the command line for launching/terminating/killing task JVM.
-   * Following is the format for launching/terminating/killing task JVM
-   * <br/>
-   * For launching following is command line argument:
-   * <br/>
-   * {@code user-name command tt-root job_id task_id} 
-   * <br/>
-   * For terminating/killing task jvm.
-   * {@code user-name command tt-root task-pid}
-   * 
-   * @param command command to be executed.
-   * @param userName user name
-   * @param cmdArgs list of extra arguments
-   * @param workDir working directory for the task-controller
-   * @param env JVM environment variables.
-   * @return {@link ShellCommandExecutor}
-   * @throws IOException
-   */
-  private ShellCommandExecutor buildTaskControllerExecutor(
-      TaskControllerCommands command, String userName, List<String> cmdArgs,
-      File workDir, Map<String, String> env)
-      throws IOException {
-    String[] taskControllerCmd = new String[3 + cmdArgs.size()];
-    taskControllerCmd[0] = getTaskControllerExecutablePath();
-    taskControllerCmd[1] = userName;
-    taskControllerCmd[2] = String.valueOf(command.ordinal());
-    int i = 3;
-    for (String cmdArg : cmdArgs) {
-      taskControllerCmd[i++] = cmdArg;
-    }
-    if (LOG.isDebugEnabled()) {
-      for (String cmd : taskControllerCmd) {
-        LOG.debug("taskctrl command = " + cmd);
-      }
-    }
-    ShellCommandExecutor shExec = null;
-    if(workDir != null && workDir.exists()) {
-      shExec = new ShellCommandExecutor(taskControllerCmd,
-          workDir, env);
-    } else {
-      shExec = new ShellCommandExecutor(taskControllerCmd);
-    }
-    
-    return shExec;
-  }
-  
-  // Return the task specific directory under the cache.
-  private String getTaskCacheDirectory(TaskControllerContext context) {
-    // In the case of JVM reuse, the task specific directory
-    // is different from what is set with respect with
-    // env.workDir. Hence building this from the taskId everytime.
-    String taskId = context.task.getTaskID().toString();
-    File cacheDirForJob = context.env.workDir.getParentFile().getParentFile();
-    if(context.task.isTaskCleanupTask()) {
-      taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
-    }
-    return new File(cacheDirForJob, taskId).getAbsolutePath(); 
-  }
-  
-  // Write the JVM command line to a file under the specified directory
-  // Note that the JVM will be launched using a setuid executable, and
-  // could potentially contain strings defined by a user. Hence, to
-  // prevent special character attacks, we write the command line to
-  // a file and execute it.
-  private void writeCommand(String cmdLine, 
-                                      String directory) throws IOException {
-    
-    PrintWriter pw = null;
-    String commandFile = directory + File.separator + COMMAND_FILE;
-    LOG.info("Writing commands to " + commandFile);
-    try {
-      FileWriter fw = new FileWriter(commandFile);
-      BufferedWriter bw = new BufferedWriter(fw);
-      pw = new PrintWriter(bw);
-      pw.write(cmdLine);
-    } catch (IOException ioe) {
-      LOG.error("Caught IOException while writing JVM command line to file. "
-                + ioe.getMessage());
-    } finally {
-      if (pw != null) {
-        pw.close();
-      }
-      // set execute permissions for all on the file.
-      File f = new File(commandFile);
-      if (f.exists()) {
-        f.setReadable(true, false);
-        f.setExecutable(true, false);
-      }
-    }
-  }
-
-  protected String getTaskControllerExecutablePath() {
-    return taskControllerExe;
-  }  
-
-  private List<String> buildInitializeJobCommandArgs(
-      JobInitializationContext context) {
-    List<String> initJobCmdArgs = new ArrayList<String>();
-    initJobCmdArgs.add(context.jobid.toString());
-    return initJobCmdArgs;
-  }
-
-  @Override
-  void initializeJob(JobInitializationContext context)
-      throws IOException {
-    LOG.debug("Going to initialize job " + context.jobid.toString()
-        + " on the TT");
-    runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
-        buildInitializeJobCommandArgs(context), context.workDir, null);
-  }
-  
-  /**
-   * API which builds the command line to be pass to LinuxTaskController
-   * binary to terminate/kill the task. See 
-   * {@code buildTaskControllerExecutor(TaskControllerCommands, 
-   * String, List<String>, JvmEnv)} documentation.
-   * 
-   * 
-   * @param context context of task which has to be passed kill signal.
-   * 
-   */
-  private List<String> buildKillTaskCommandArgs(TaskControllerContext 
-      context){
-    List<String> killTaskJVMArgs = new ArrayList<String>();
-    killTaskJVMArgs.add(context.pid);
-    return killTaskJVMArgs;
-  }
-  
-  /**
-   * Convenience method used to sending appropriate Kill signal to the task 
-   * VM
-   * @param context
-   * @param command
-   * @throws IOException
-   */
-  private void finishTask(TaskControllerContext context,
-      TaskControllerCommands command) throws IOException{
-    if(context.task == null) {
-      LOG.info("Context task null not killing the JVM");
-      return;
-    }
-    ShellCommandExecutor shExec = buildTaskControllerExecutor(
-        command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir,
-        context.env.env);
+  public void signalTask(String user, int taskPid, 
+                         Signal signal) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.SIGNAL_TASK.getValue()),
+                   Integer.toString(taskPid),
+                   Integer.toString(signal.getValue())};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     try {
       shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
-      throw new IOException(e);
-    }
-  }
-  
-  @Override
-  void terminateTask(TaskControllerContext context) {
-    try {
-      finishTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending kill to the Task VM " + 
-          StringUtils.stringifyException(e));
-    }
-  }
-  
-  @Override
-  void killTask(TaskControllerContext context) {
-    try {
-      finishTask(context, TaskControllerCommands.KILL_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending destroy to the Task VM " + 
-          StringUtils.stringifyException(e));
+    } catch (ExitCodeException e) {
+      int ret_code = shExec.getExitCode();
+      if (ret_code != ResultCode.INVALID_TASK_PID.getValue()) {
+        logOutput(shExec.getOutput());
+        throw new IOException("Problem signalling task " + taskPid + " with " +
+                              signal + "; exit = " + ret_code);
+      }
     }
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 04:43:33 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.filecache.Distr
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -59,6 +58,7 @@ class LocalJobRunner implements JobSubmi
   private int map_tasks = 0;
   private int reduce_tasks = 0;
   final Random rand = new Random();
+  private final TaskController taskController = new DefaultTaskController();
 
   private JobTrackerInstrumentation myMetrics = null;
 
@@ -89,7 +89,7 @@ class LocalJobRunner implements JobSubmi
     private FileSystem localFs;
     boolean killed = false;
     
-    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TrackerDistributedCacheManager trackerDistributedCacheManager;
     private TaskDistributedCacheManager taskDistributedCacheManager;
     
     // Counters summed over all the map/reduce tasks which
@@ -115,14 +115,13 @@ class LocalJobRunner implements JobSubmi
 
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
-      this.trackerDistributerdCacheManager =
-        new TrackerDistributedCacheManager(conf, new DefaultTaskController());
+      this.trackerDistributedCacheManager =
+        new TrackerDistributedCacheManager(conf, taskController);
       this.taskDistributedCacheManager =
-        trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
-      taskDistributedCacheManager.setup(
-          new LocalDirAllocator("mapred.local.dir"),
-          new File(systemJobDir.toString()),
-          "archive", "archive");
+        trackerDistributedCacheManager.newTaskDistributedCacheManager(jobid, conf);
+      taskDistributedCacheManager.setupCache(
+          "archive",
+          "archive");
       
       if (DistributedCache.getSymlink(conf)) {
         // This is not supported largely because,
@@ -302,7 +301,7 @@ class LocalJobRunner implements JobSubmi
           localFs.delete(localJobFile, true);              // delete local copy
           // Cleanup distributed cache
           taskDistributedCacheManager.release();
-          trackerDistributerdCacheManager.purgeCache();
+          trackerDistributedCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -395,6 +394,14 @@ class LocalJobRunner implements JobSubmi
       return new MapTaskCompletionEventsUpdate(TaskCompletionEvent.EMPTY_ARRAY,
                                                false);
     }
+
+    @Override
+    public void updatePrivateDistributedCacheSizes(
+                                                   org.apache.hadoop.mapreduce.JobID jobId,
+                                                   long[] sizes)
+                                                                throws IOException {
+      trackerDistributedCacheManager.setArchiveSizes(jobId, sizes);
+    }
     
   }
 
@@ -402,6 +409,7 @@ class LocalJobRunner implements JobSubmi
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = JobTrackerInstrumentation.create(null, new JobConf(conf));
+    taskController.setConf(conf);
   }
 
   // JobSubmissionProtocol methods

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar  4 04:43:33 2011
@@ -128,8 +128,10 @@ class MapTask extends Task {
   
   @Override
   public TaskRunner createRunner(TaskTracker tracker, 
-      TaskTracker.TaskInProgress tip) {
-    return new MapTaskRunner(tip, tracker, this.conf);
+                                 TaskTracker.TaskInProgress tip,
+                                 TaskTracker.RunningJob rjob
+                                 ) throws IOException {
+    return new MapTaskRunner(tip, tracker, this.conf, rjob);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskRunner.java Fri Mar  4 04:43:33 2011
@@ -24,8 +24,10 @@ import org.apache.hadoop.mapred.TaskTrac
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
   
-  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
-    super(task, tracker, conf);
+  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf,
+                       TaskTracker.RunningJob rjob) 
+  throws IOException {
+    super(task, tracker, conf, rjob);
   }
   
   /** Delete any temporary files from previous failed attempts. */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 04:43:33 2011
@@ -172,9 +172,10 @@ class ReduceTask extends Task {
   }
 
   @Override
-  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip) 
-  throws IOException {
-    return new ReduceTaskRunner(tip, tracker, this.conf);
+  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+                                 TaskTracker.RunningJob rjob
+                                 ) throws IOException {
+    return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Mar  4 04:43:33 2011
@@ -25,9 +25,10 @@ import org.apache.hadoop.mapred.TaskTrac
 class ReduceTaskRunner extends TaskRunner {
 
   public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, 
-                          JobConf conf) throws IOException {
+                          JobConf conf, TaskTracker.RunningJob rjob
+                          ) throws IOException {
     
-    super(task, tracker, conf);
+    super(task, tracker, conf, rjob);
   }
 
   /** Assemble all of the map output files */

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 04:43:33 2011
@@ -451,7 +451,9 @@ abstract public class Task implements Wr
   /** Return an approprate thread runner for this task. 
    * @param tip TODO*/
   public abstract TaskRunner createRunner(TaskTracker tracker, 
-      TaskTracker.TaskInProgress tip) throws IOException;
+                                          TaskTracker.TaskInProgress tip, 
+                                          TaskTracker.RunningJob rjob
+                                          ) throws IOException;
 
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskController.java Fri Mar  4 04:43:33 2011
@@ -17,19 +17,24 @@
 */
 package org.apache.hadoop.mapred;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.ProcessTree.Signal;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -48,360 +53,153 @@ import org.apache.hadoop.util.Shell.Shel
 public abstract class TaskController implements Configurable {
   
   private Configuration conf;
-  
+
   public static final Log LOG = LogFactory.getLog(TaskController.class);
   
+  //Name of the executable script that will contain the child
+  // JVM command line. See writeCommand for details.
+  protected static final String COMMAND_FILE = "taskjvm.sh";
+  
+  protected LocalDirAllocator allocator;
+
+  final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+  FsPermission.createImmutable((short) 0700); // rwx--------
+  
   public Configuration getConf() {
     return conf;
   }
 
-  // The list of directory paths specified in the variable mapred.local.dir.
-  // This is used to determine which among the list of directories is picked up
-  // for storing data for a particular task.
-  protected String[] mapredLocalDirs;
-
   public void setConf(Configuration conf) {
     this.conf = conf;
-    mapredLocalDirs = conf.getStrings("mapred.local.dir");
   }
 
   /**
-   * Sets up the permissions of the following directories on all the configured
-   * disks:
-   * <ul>
-   * <li>mapred-local directories</li>
-   * <li>Hadoop log directories</li>
-   * </ul>
+   * Does initialization and setup.
+   * @param allocator the local dir allocator to use
    */
-  public void setup() throws IOException {
-    for (String localDir : this.mapredLocalDirs) {
-      // Set up the mapred-local directories.
-      File mapredlocalDir = new File(localDir);
-      if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
-        LOG.warn("Unable to create mapred-local directory : "
-            + mapredlocalDir.getPath());
-      } else {
-        Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
-            Localizer.PermissionsHandler.sevenFiveFive);
-      }
-    }
-
-    // Set up the user log directory
-    File taskLog = TaskLog.getUserLogDir();
-    if (!taskLog.exists() && !taskLog.mkdirs()) {
-      LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
-    } else {
-      Localizer.PermissionsHandler.setPermissions(taskLog,
-          Localizer.PermissionsHandler.sevenFiveFive);
-    }
-  }
-
+  public abstract void setup(LocalDirAllocator allocator) throws IOException;
+  
   /**
-   * Take task-controller specific actions to initialize job. This involves
-   * setting appropriate permissions to job-files so as to secure the files to
-   * be accessible only by the user's tasks.
-   * 
+   * Create all of the directories necessary for the job to start and download
+   * all of the job and private distributed cache files.
+   * Creates both the user directories and the job log directory.
+   * @param user the user name
+   * @param jobid the job
+   * @param credentials a filename containing the job secrets
+   * @param jobConf the path to the localized configuration file
+   * @param taskTracker the connection the task tracker
    * @throws IOException
    */
-  abstract void initializeJob(JobInitializationContext context) throws IOException;
-
-  /**
-   * Take task-controller specific actions to initialize the distributed cache
-   * file. This involves setting appropriate permissions for these files so as
-   * to secure them to be accessible only their owners.
-   * 
-   * @param context
+  public abstract void initializeJob(String user, String jobid, 
+                                     Path credentials, Path jobConf,
+                                     TaskUmbilicalProtocol taskTracker) 
+  throws IOException;
+  
+  /**
+   * Create all of the directories for the task and launches the child jvm.
+   * @param user the user name
+   * @param jobId the jobId in question
+   * @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
+   * @param setup list of shell commands to execute before the jvm
+   * @param jvmArguments list of jvm arguments
+   * @param currentWorkDirectory the full path of the cwd for the task
+   * @param stdout the file to redirect stdout to
+   * @param stderr the file to redirect stderr to
+   * @return the exit code for the task
    * @throws IOException
    */
-  public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException;
-
-  /**
-   * Launch a task JVM
-   * 
-   * This method defines how a JVM will be launched to run a task. Each
-   * task-controller should also do an
-   * {@link #initializeTask(TaskControllerContext)} inside this method so as to
-   * initialize the task before launching it. This is for reasons of
-   * task-controller specific optimizations w.r.t combining initialization and
-   * launching of tasks.
-   * 
-   * @param context the context associated to the task
-   */
-  abstract void launchTaskJVM(TaskControllerContext context)
-                                      throws IOException;
-
-  /**
-   * Top level cleanup a task JVM method.
-   *
-   * The current implementation does the following.
-   * <ol>
-   * <li>Sends a graceful terminate signal to task JVM allowing its sub-process
-   * to cleanup.</li>
-   * <li>Waits for stipulated period</li>
-   * <li>Sends a forceful kill signal to task JVM, terminating all its
-   * sub-process forcefully.</li>
-   * </ol>
-   * 
-   * @param context the task for which kill signal has to be sent.
-   */
-  final void destroyTaskJVM(TaskControllerContext context) {
-    terminateTask(context);
-    try {
-      Thread.sleep(context.sleeptimeBeforeSigkill);
-    } catch (InterruptedException e) {
-      LOG.warn("Sleep interrupted : " + 
-          StringUtils.stringifyException(e));
-    }
-    killTask(context);
-  }
-
-  /** Perform initializing actions required before a task can run.
-    * 
-    * For instance, this method can be used to setup appropriate
-    * access permissions for files and directories that will be
-    * used by tasks. Tasks use the job cache, log, and distributed cache
-    * directories and files as part of their functioning. Typically,
-    * these files are shared between the daemon and the tasks
-    * themselves. So, a TaskController that is launching tasks
-    * as different users can implement this method to setup
-    * appropriate ownership and permissions for these directories
-    * and files.
-    */
-  abstract void initializeTask(TaskControllerContext context)
-      throws IOException;
-
-  /**
-   * Contains task information required for the task controller.  
+  public abstract
+  int launchTask(String user, 
+                 String jobId,
+                 String attemptId,
+                 List<String> setup,
+                 List<String> jvmArguments,
+                 File currentWorkDirectory,
+                 String stdout,
+                 String stderr) throws IOException;
+  
+  /**
+   * Send a signal to a task pid as the user.
+   * @param user the user name
+   * @param taskPid the pid of the task
+   * @param signal the id of the signal to send
+   */
+  public abstract void signalTask(String user, int taskPid, 
+                                  Signal signal) throws IOException;
+  
+  /**
+   * Delete the user's files under all of the task tracker root directories.
+   * @param user the user name
+   * @param subDir the path relative to the user's subdirectory under
+   *        the task tracker root directories.
+   * @throws IOException
    */
-  static class TaskControllerContext {
-    // task being executed
-    Task task;
-    ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
-
-    // Information used only when this context is used for launching new tasks.
-    JvmEnv env;     // the JVM environment for the task.
-
-    // Information used only when this context is used for destroying a task jvm.
-    String pid; // process handle of task JVM.
-    long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
-  }
-
+  public abstract void deleteAsUser(String user, 
+                                    String subDir) throws IOException;
+  
   /**
-   * Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the file/dir
+   * Delete the user's files under the userlogs directory.
+   * @param user the user to work as
+   * @param subDir the path under the userlogs directory.
+   * @throws IOException
    */
-  static abstract class TaskControllerPathDeletionContext 
-  extends PathDeletionContext {
-    TaskController taskController;
-    String user;
-
-    /**
-     * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir, 
-     * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir, 
-     * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId, 
-     * taskId, etc.
-     */
-    Path mapredLocalDir;
-
-    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
-                                             TaskController taskController,
-                                             String user) {
-      super(fs, null);
-      this.taskController = taskController;
-      this.mapredLocalDir = mapredLocalDir;
+  public abstract void deleteLogAsUser(String user, 
+                                       String subDir) throws IOException;
+  
+  static class DeletionContext extends CleanupQueue.PathDeletionContext {
+    private TaskController controller;
+    private boolean isLog;
+    private String user;
+    private String subDir;
+    DeletionContext(TaskController controller, boolean isLog, String user, 
+                    String subDir) {
+      super(null, null);
+      this.controller = controller;
+      this.isLog = isLog;
       this.user = user;
+      this.subDir = subDir;
     }
-
-    @Override
-    protected String getPathForCleanup() {
-      if (fullPath == null) {
-        fullPath = buildPathForDeletion();
-      }
-      return fullPath;
-    }
-
-    /**
-     * Return the component of the path under the {@link #mapredLocalDir} to be 
-     * cleaned up. Its the responsibility of the class that extends 
-     * {@link TaskControllerPathDeletionContext} to provide the correct 
-     * component. For example 
-     *  - For task related cleanups, either the task-work-dir or task-local-dir
-     *    might be returned depending on jvm reuse.
-     *  - For job related cleanup, simply the job-local-dir might be returned.
-     */
-    abstract protected String getPath();
-    
-    /**
-     * Builds the path of taskAttemptDir OR taskWorkDir based on
-     * mapredLocalDir, jobId, taskId, etc
-     */
-    String buildPathForDeletion() {
-      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
-    }
-  }
-
-  /** Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the task-work-dir or
-   * task-local-dir depending on whether the jvm is reused or not.
-   */
-  static class TaskControllerTaskPathDeletionContext 
-  extends TaskControllerPathDeletionContext {
-    final Task task;
-    final boolean isWorkDir;
     
-    public TaskControllerTaskPathDeletionContext(FileSystem fs, 
-        Path mapredLocalDir, Task task, boolean isWorkDir, 
-        TaskController taskController) {
-      super(fs, mapredLocalDir, taskController, task.getUser());
-      this.task = task;
-      this.isWorkDir = isWorkDir;
-    }
-    
-    /**
-     * Returns the taskWorkDir or taskLocalDir based on whether 
-     * {@link TaskControllerTaskPathDeletionContext} is configured to delete
-     * the workDir.
-     */
-    @Override
-    protected String getPath() {
-      String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask())
-        : TaskTracker.getLocalTaskDir(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask());
-      return subDir;
-    }
-
-    /**
-     * Makes the path(and its subdirectories recursively) fully deletable by
-     * setting proper permissions(770) by task-controller
-     */
     @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath, if not inited already
-      if (fs.exists(new Path(fullPath))) {
-        taskController.enableTaskForCleanup(this);
+    protected void deletePath() throws IOException {
+      if (isLog) {
+        controller.deleteLogAsUser(user, subDir);
+      } else {
+        controller.deleteAsUser(user, subDir);
       }
     }
   }
 
-  /** Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the job-local-dir.
-   */
-  static class TaskControllerJobPathDeletionContext 
-  extends TaskControllerPathDeletionContext {
-    final JobID jobId;
-    
-    public TaskControllerJobPathDeletionContext(FileSystem fs, 
-        Path mapredLocalDir, JobID id, String user, 
-        TaskController taskController) {
-      super(fs, mapredLocalDir, taskController, user);
-      this.jobId = id;
-    }
-    
-    /**
-     * Returns the jobLocalDir of the job to be cleaned up.
-     */
-    @Override
-    protected String getPath() {
-      return TaskTracker.getLocalJobDir(user, jobId.toString());
-    }
-    
-    /**
-     * Makes the path(and its sub-directories recursively) fully deletable by
-     * setting proper permissions(770) by task-controller
-     */
-    @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath, if not inited already
-      if (fs.exists(new Path(fullPath))) {
-        taskController.enableJobForCleanup(this);
+  //Write the JVM command line to a file under the specified directory
+  // Note that the JVM will be launched using a setuid executable, and
+  // could potentially contain strings defined by a user. Hence, to
+  // prevent special character attacks, we write the command line to
+  // a file and execute it.
+  protected static String writeCommand(String cmdLine, FileSystem fs,
+      Path commandFile) throws IOException {
+    PrintWriter pw = null;
+    LOG.info("Writing commands to " + commandFile);
+    try {
+      pw = new PrintWriter(FileSystem.create(
+            fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
+      pw.write(cmdLine);
+    } catch (IOException ioe) {
+      LOG.error("Caught IOException while writing JVM command line to file. ",
+          ioe);
+    } finally {
+      if (pw != null) {
+        pw.close();
       }
     }
+    return commandFile.makeQualified(fs).toUri().getPath();
   }
   
-  /**
-   * NOTE: This class is internal only class and not intended for users!!
-   * 
-   */
-  public static class InitializationContext {
-    public File workDir;
-    public String user;
-    
-    public InitializationContext() {
-    }
-    
-    public InitializationContext(String user, File workDir) {
-      this.user = user;
-      this.workDir = workDir;
-    }
-  }
-  
-  /**
-   * This is used for initializing the private localized files in distributed
-   * cache. Initialization would involve changing permission, ownership and etc.
-   */
-  public static class DistributedCacheFileContext extends InitializationContext {
-    // base directory under which file has been localized
-    Path localizedBaseDir;
-    // the unique string used to construct the localized path
-    String uniqueString;
-
-    public DistributedCacheFileContext(String user, File workDir,
-        Path localizedBaseDir, String uniqueString) {
-      super(user, workDir);
-      this.localizedBaseDir = localizedBaseDir;
-      this.uniqueString = uniqueString;
-    }
-
-    public Path getLocalizedUniqueDir() {
-      return new Path(localizedBaseDir, new Path(TaskTracker
-          .getPrivateDistributedCacheDir(user), uniqueString));
+  protected void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
+      }
     }
   }
-
-  static class JobInitializationContext extends InitializationContext {
-    JobID jobid;
-  }
-
-  /**
-   * Sends a graceful terminate signal to taskJVM and it sub-processes. 
-   *   
-   * @param context task context
-   */
-  abstract void terminateTask(TaskControllerContext context);
-  
-  /**
-   * Enable the task for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException;
-  /**
-   * Sends a KILL signal to forcefully terminate the taskJVM and its
-   * sub-processes.
-   * 
-   * @param context task context
-   */
-  abstract void killTask(TaskControllerContext context);
-
-  /**
-   * Initialize user on this TaskTracer in a TaskController specific manner.
-   * 
-   * @param context
-   * @throws IOException
-   */
-  public abstract void initializeUser(InitializationContext context)
-      throws IOException;
-  
-  /**
-   * Enable the job for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableJobForCleanup(PathDeletionContext context)
-    throws IOException;
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Mar  4 04:43:33 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -158,7 +159,14 @@ public class TaskLog {
 
   static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
     String cleanupSuffix = isCleanup ? ".cleanup" : "";
-    return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
+    return getAttemptDir(taskid.getJobID().toString(), 
+        taskid.toString() + cleanupSuffix);
+  }
+  
+  static File getAttemptDir(String jobid, String taskid) {
+    // taskid should be fully formed and it should have the optional 
+    // .cleanup suffix
+    return new File(getJobDir(jobid), taskid);
   }
 
   static final List<LogName> LOGS_TRACKED_BY_INDEX_FILES =
@@ -232,6 +240,9 @@ public class TaskLog {
         }
       }
     }
+    if (currentTaskid == null) {
+      currentTaskid = taskid;
+    }
     // set start and end
     for (LogName logName : LOGS_TRACKED_BY_INDEX_FILES) {
       if (currentTaskid != taskid) {
@@ -493,7 +504,8 @@ public class TaskLog {
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
-    String mergedCmd = buildCommandLine(setup, cmd,
+    String mergedCmd = buildCommandLine(setup,
+        cmd,
         stdoutFilename,
         stderrFilename, tailLength,
         useSetsid);
@@ -511,15 +523,17 @@ public class TaskLog {
     
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);
-    StringBuffer mergedCmd = new StringBuffer();
+    StringBuilder mergedCmd = new StringBuilder();
     
     if (!Shell.WINDOWS) {
-      mergedCmd.append(" export JVM_PID=`echo $$` ; ");
+      mergedCmd.append("export JVM_PID=`echo $$`\n");
     }
 
-    if (setup != null && setup.size() > 0) {
-      mergedCmd.append(addCommand(setup, false));
-      mergedCmd.append(";");
+    if (setup != null) {
+      for (String s : setup) {
+        mergedCmd.append(s);
+        mergedCmd.append("\n");
+      }
     }
     if (tailLength > 0) {
       mergedCmd.append("(");
@@ -627,11 +641,21 @@ public class TaskLog {
   /**
    * Get the user log directory for the job jobid.
    * 
-   * @param jobid
+   * @param jobid string representation of the jobid
+   * @return user log directory for the job
+   */
+  public static File getJobDir(String jobid) {
+    return new File(getUserLogDir(), jobid);
+  }
+  
+  /**
+   * Get the user log directory for the job jobid.
+   * 
+   * @param jobid the jobid object
    * @return user log directory for the job
    */
   public static File getJobDir(JobID jobid) {
-    return new File(getUserLogDir(), jobid.toString());
+    return getJobDir(jobid.toString());
   }
 
 } // TaskLog

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Fri Mar  4 04:43:33 2011
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -184,13 +183,8 @@ class TaskMemoryManagerThread extends Th
               // itself is still retained in runningTasks till successful
               // transmission to JT
 
-              // create process tree object
-              long sleeptimeBeforeSigkill = taskTracker.getJobConf().getLong(
-                  "mapred.tasktracker.tasks.sleeptime-before-sigkill",
-                  ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-              
-              ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(
-                  pId,ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
+              ProcfsBasedProcessTree pt = 
+                new ProcfsBasedProcessTree(pId, ProcessTree.isSetsidAvailable);
               LOG.debug("Tracking ProcessTree " + pId + " for the first time");
 
               ptInfo.setPid(pId);
@@ -228,10 +222,9 @@ class TaskMemoryManagerThread extends Th
                     + "bytes. Killing task. \nDump of the process-tree for "
                     + tid + " : \n" + pTree.getProcessTreeDump();
             LOG.warn(msg);
+            // kill the task
             taskTracker.cleanUpOverMemoryTask(tid, true, msg);
 
-            // Now destroy the ProcessTree, remove it from monitoring map.
-            pTree.destroy(true/*in the background*/);
             it.remove();
             LOG.info("Removed ProcessTree with root " + pId);
           } else {
@@ -365,8 +358,6 @@ class TaskMemoryManagerThread extends Th
         taskTracker.cleanUpOverMemoryTask(tid, false, msg);
         // Now destroy the ProcessTree, remove it from monitoring map.
         ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
-        ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
-        pTree.destroy(true/*in the background*/);
         processTreeInfoMap.remove(tid);
         LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
       }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=1077679&r1=1077678&r2=1077679&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Mar  4 04:43:33 2011
@@ -20,16 +20,16 @@ package org.apache.hadoop.mapred;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Vector;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.JobContext;
 
 /** Base class that runs a task in a separate process.  Tasks are run in a
  * separate process in order to isolate the map/reduce system code from bugs in
@@ -74,6 +73,8 @@ abstract class TaskRunner extends Thread
 
   static final String DEFAULT_HOME_DIR= "/homes/";
   
+  static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
+  
   static final String MAPRED_ADMIN_USER_ENV =
     "mapreduce.admin.user.env";
 
@@ -88,13 +89,19 @@ abstract class TaskRunner extends Thread
   private int exitCode = -1;
   private boolean exitCodeSet = false;
   
-  private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+  private static String SYSTEM_PATH_SEPARATOR = 
+    System.getProperty("path.separator");
   static final String MAPREDUCE_USER_CLASSPATH_FIRST =
         "mapreduce.user.classpath.first"; //a semi-hidden config
 
   
   private TaskTracker tracker;
-  private TaskDistributedCacheManager taskDistributedCacheManager;
+  private final TaskDistributedCacheManager taskDistributedCacheManager;
+  private String[] localdirs;
+  final private static Random rand;
+  static {
+    rand = new Random();
+  }
 
   protected JobConf conf;
   JvmManager jvmManager;
@@ -105,7 +112,8 @@ abstract class TaskRunner extends Thread
   protected MapOutputFile mapOutputFile;
 
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
-      JobConf conf) {
+                    JobConf conf, TaskTracker.RunningJob rjob
+                    ) throws IOException {
     this.tip = tip;
     this.t = tip.getTask();
     this.tracker = tracker;
@@ -113,6 +121,8 @@ abstract class TaskRunner extends Thread
     this.mapOutputFile = new MapOutputFile();
     this.mapOutputFile.setConf(conf);
     this.jvmManager = tracker.getJvmManagerInstance();
+    this.localdirs = conf.getLocalDirs();
+    taskDistributedCacheManager = rjob.distCacheMgr;
   }
 
   public Task getTask() { return t; }
@@ -182,27 +192,20 @@ abstract class TaskRunner extends Thread
       //all the archives
       TaskAttemptID taskid = t.getTaskID();
       final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-      final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
+      //simply get the location of the workDir and pass it to the child. The
+      //child will do the actual dir creation
+      final File workDir =
+      new File(new Path(localdirs[rand.nextInt(localdirs.length)], 
+          TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), 
+          taskid.toString(),
+          t.isTaskCleanupTask())).toString());
       
-      // We don't create any symlinks yet, so presence/absence of workDir
-      // actually on the file system doesn't matter.
       String user = tip.getUGI().getUserName();
-      tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws IOException {
-          taskDistributedCacheManager =
-            tracker.getTrackerDistributedCacheManager()
-            .newTaskDistributedCacheManager(conf);
-          taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
-              .getPrivateDistributedCacheDir(conf.getUser()),
-                   TaskTracker.getPublicDistributedCacheDir());
-          return null;
-        }
-      });
       
       // Set up the child task's configuration. After this call, no localization
       // of files should happen in the TaskTracker's process space. Any changes to
       // the conf object after this will NOT be reflected to the child.
-      setupChildTaskConfiguration(lDirAlloc);
+      // setupChildTaskConfiguration(lDirAlloc);
 
       if (!prepare()) {
         return;
@@ -220,7 +223,7 @@ abstract class TaskRunner extends Thread
       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
-      List<String> setup = getVMSetupCmd();
+      String setup = getVMSetupCmd();
       // Set up the redirection of the task's stdout and stderr streams
       File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
       File stdout = logFiles[0];
@@ -231,8 +234,21 @@ abstract class TaskRunner extends Thread
       Map<String, String> env = new HashMap<String, String>();
       errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
                                    logSize);
-
-      launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
+      
+      // flatten the env as a set of export commands
+      List <String> setupCmds = new ArrayList<String>();
+      for(Entry<String, String> entry : env.entrySet()) {
+        StringBuffer sb = new StringBuffer();
+        sb.append("export ");
+        sb.append(entry.getKey());
+        sb.append("=\"");
+        sb.append(entry.getValue());
+        sb.append("\"");
+        setupCmds.add(sb.toString());
+      }
+      setupCmds.add(setup);
+      
+      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
       if (exitCodeSet) {
         if (!killed && exitCode != 0) {
@@ -261,13 +277,6 @@ abstract class TaskRunner extends Thread
         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
       }
     } finally {
-      try{
-        if (taskDistributedCacheManager != null) {
-          taskDistributedCacheManager.release();
-        }
-      }catch(IOException ie){
-        LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
-      }
       
       // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
       // *false* since the task has either
@@ -277,11 +286,11 @@ abstract class TaskRunner extends Thread
     }
   }
 
-  void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
-      File stderr, long logSize, File workDir, Map<String, String> env)
-      throws InterruptedException {
+  void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
+      File stderr, long logSize, File workDir)
+      throws InterruptedException, IOException {
     jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
-        stderr, logSize, workDir, env, conf));
+        stderr, logSize, workDir, conf));
     synchronized (lock) {
       while (!done) {
         lock.wait();
@@ -332,7 +341,7 @@ abstract class TaskRunner extends Thread
                 .isTaskCleanupTask()), conf);
 
     // write the child's task configuration file to the local disk
-    writeLocalTaskFile(localTaskFile.toString(), conf);
+    JobLocalizer.writeLocalJobFile(localTaskFile, conf);
 
     // Set the final job file in the task. The child needs to know the correct
     // path to job.xml. So set this path accordingly.
@@ -342,16 +351,21 @@ abstract class TaskRunner extends Thread
   /**
    * @return
    */
-  private List<String> getVMSetupCmd() {
-    String[] ulimitCmd = Shell.getUlimitMemoryCommand(getChildUlimit(conf));
-    List<String> setup = null;
-    if (ulimitCmd != null) {
-      setup = new ArrayList<String>();
-      for (String arg : ulimitCmd) {
-        setup.add(arg);
-      }
+  private String getVMSetupCmd() {
+    final int ulimit = getChildUlimit(conf);
+    if (ulimit <= 0) {
+      return "";
+    }
+    String setup[] = Shell.getUlimitMemoryCommand(ulimit);
+    StringBuilder command = new StringBuilder();
+    for (String str : setup) {
+      command.append('\'');
+      command.append(str);
+      command.append('\'');
+      command.append(" ");
     }
-    return setup;
+    command.append("\n");
+    return command.toString();
   }
 
   /**
@@ -434,7 +448,7 @@ abstract class TaskRunner extends Thread
       vargs.add(javaOptsSplit[i]);
     }
 
-    Path childTmpDir = createChildTmpDir(workDir, conf);
+    Path childTmpDir = createChildTmpDir(workDir, conf, false);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);
 
     // Add classpath.
@@ -483,7 +497,7 @@ abstract class TaskRunner extends Thread
    * @throws IOException
    */
   static Path createChildTmpDir(File workDir,
-      JobConf conf)
+      JobConf conf, boolean createDir)
       throws IOException {
 
     // add java.io.tmpdir given by mapred.child.tmp
@@ -493,10 +507,13 @@ abstract class TaskRunner extends Thread
     // if temp directory path is not absolute, prepend it with workDir.
     if (!tmpDir.isAbsolute()) {
       tmpDir = new Path(workDir.toString(), tmp);
-
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      if (createDir) {
+        FileSystem localFs = FileSystem.getLocal(conf);
+        if (!localFs.mkdirs(tmpDir) && 
+            !localFs.getFileStatus(tmpDir).isDir()) {
+          throw new IOException("Mkdirs failed to create " +
+              tmpDir.toString());
+        }
       }
     }
     return tmpDir;
@@ -535,9 +552,10 @@ abstract class TaskRunner extends Thread
     return classPaths;
   }
 
-  private String getVMEnvironment(String errorInfo, String user, File workDir, JobConf conf,
-      Map<String, String> env, TaskAttemptID taskid, long logSize)
-      throws Throwable {
+  private String getVMEnvironment(String errorInfo, String user, File workDir, 
+                                  JobConf conf, Map<String, String> env, 
+                                  TaskAttemptID taskid, long logSize
+                                  ) throws Throwable {
     StringBuffer ldLibraryPath = new StringBuffer();
     ldLibraryPath.append(workDir.toString());
     String oldLdLibraryPath = null;
@@ -547,11 +565,13 @@ abstract class TaskRunner extends Thread
       ldLibraryPath.append(oldLdLibraryPath);
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+    env.put(HADOOP_WORK_DIR, workDir.toString());
     //update user configured login-shell properties
     updateUserLoginEnv(errorInfo, user, conf, env);
+    // put jobTokenFile name into env
     String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
-    LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
-    env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);    
+    LOG.debug("putting jobToken file name into environment " + jobTokenFile);
+    env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
     // for the child of task jvm, set hadoop.root.logger
     env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
     String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
@@ -560,9 +580,9 @@ abstract class TaskRunner extends Thread
     } else {
       hadoopClientOpts = hadoopClientOpts + " ";
     }
-    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid
-                       + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask()
-                       + " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
+    hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" +
+      taskid + " -Dhadoop.tasklog.iscleanup=" + t.isTaskCleanupTask() +
+      " -Dhadoop.tasklog.totalLogFileSize=" + logSize;
     // following line is a backport from jira MAPREDUCE-1286 
     env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
 
@@ -639,25 +659,6 @@ abstract class TaskRunner extends Thread
   }
 
   /**
-   * Write the task specific job-configuration file.
-   * 
-   * @param localFs
-   * @throws IOException
-   */
-  private static void writeLocalTaskFile(String jobFile, JobConf conf)
-      throws IOException {
-    Path localTaskFile = new Path(jobFile);
-    FileSystem localFs = FileSystem.getLocal(conf);
-    localFs.delete(localTaskFile, true);
-    OutputStream out = localFs.create(localTaskFile);
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  /**
    * Prepare the mapred.local.dir for the child. The child is sand-boxed now.
    * Whenever it uses LocalDirAllocator from now on inside the child, it will
    * only see files inside the attempt-directory. This is done in the Child's
@@ -681,15 +682,11 @@ abstract class TaskRunner extends Thread
   }
 
   /** Creates the working directory pathname for a task attempt. */ 
-  static File formWorkDir(LocalDirAllocator lDirAlloc, 
-      TaskAttemptID task, boolean isCleanup, JobConf conf) 
+  static Path formWorkDir(LocalDirAllocator lDirAlloc, JobConf conf) 
       throws IOException {
     Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
-            conf.getUser(), task.getJobID().toString(), task.toString(),
-            isCleanup), conf);
-
-    return new File(workDir.toString());
+        lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+    return workDir;
   }
 
   private static void appendSystemClasspaths(List<String> classPaths) {
@@ -780,7 +777,7 @@ abstract class TaskRunner extends Thread
       }
     }
 
-    createChildTmpDir(workDir, conf);
+    createChildTmpDir(workDir, conf, true);
   }
 
   /**
@@ -804,8 +801,10 @@ abstract class TaskRunner extends Thread
 
   /**
    * Kill the child process
+   * @throws InterruptedException 
+   * @throws IOException 
    */
-  public void kill() {
+  public void kill() throws IOException, InterruptedException {
     killed = true;
     jvmManager.taskKilled(this);
     signalDone();