You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC

svn commit: r1079211 [8/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/fairscheduler/designdoc/ src/contrib/streaming/s...

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Mar  8 05:56:27 2011
@@ -30,13 +30,17 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DelayedProcessKiller;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
+import static org.apache.hadoop.mapred.TaskController.Signal;
 
 class JvmManager {
 
@@ -49,8 +53,8 @@ class JvmManager {
   
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
-      Map<String,String> env, JobConf conf) {
-    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
+      JobConf conf) {
+    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,conf);
   }
   
   public JvmManager(TaskTracker tracker) {
@@ -69,7 +73,7 @@ class JvmManager {
     return null;
   }
   
-  public void stop() {
+  public void stop() throws IOException, InterruptedException {
     mapJvmManager.stop();
     reduceJvmManager.stop();
   }
@@ -114,7 +118,8 @@ class JvmManager {
     return null;
   }
   
-  public void launchJvm(TaskRunner t, JvmEnv env) {
+  public void launchJvm(TaskRunner t, JvmEnv env)
+      throws IOException, InterruptedException {
     if (t.getTask().isMapTask()) {
       mapJvmManager.reapJvm(t, env);
     } else {
@@ -138,7 +143,8 @@ class JvmManager {
     }
   }
 
-  public void taskKilled(TaskRunner tr) {
+  public void taskKilled(TaskRunner tr
+                         ) throws IOException, InterruptedException {
     if (tr.getTask().isMapTask()) {
       mapJvmManager.taskKilled(tr);
     } else {
@@ -146,15 +152,7 @@ class JvmManager {
     }
   }
 
-  void dumpStack(TaskRunner tr) {
-    if (tr.getTask().isMapTask()) {
-      mapJvmManager.dumpStack(tr);
-    } else {
-      reduceJvmManager.dumpStack(tr);
-    }
-  }
-
-  public void killJvm(JVMId jvmId) {
+  public void killJvm(JVMId jvmId) throws IOException, InterruptedException {
     if (jvmId.isMap) {
       mapJvmManager.killJvm(jvmId);
     } else {
@@ -167,15 +165,19 @@ class JvmManager {
    * asynchronous deletion of work dir.
    * @param tracker taskTracker
    * @param task    the task whose work dir needs to be deleted
-   * @throws IOException
    */
-  static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
+  static void deleteWorkDir(TaskTracker tracker, Task task) {
+    String user = task.getUser();
+    String jobid = task.getJobID().toString();
+    String taskid = task.getTaskID().toString();
+    String workDir = TaskTracker.getTaskWorkDir(user, jobid, taskid, 
+                                                task.isTaskCleanupTask());
+    String userDir = TaskTracker.getUserDir(user);
     tracker.getCleanupThread().addToQueue(
-        TaskTracker.buildTaskControllerTaskPathDeletionContexts(
-          tracker.getLocalFileSystem(),
-          tracker.getLocalFiles(tracker.getJobConf(), ""),
-          task, true /* workDir */,
-          tracker.getTaskController()));
+     new TaskController.DeletionContext(tracker.getTaskController(), false,
+                                        user, 
+                                        workDir, tracker.getLocalDirs()));
+                                           
   }
 
   static class JvmManagerForType {
@@ -192,18 +194,25 @@ class JvmManager {
     Map <JVMId, String> jvmIdToPid = 
       new HashMap<JVMId, String>();
     
-    int maxJvms;
-    boolean isMap;
-    
-    TaskTracker tracker;
-    
-    Random rand = new Random(System.currentTimeMillis());
+    final int maxJvms;
+    final boolean isMap;
+    final TaskTracker tracker;
+    final long sleeptimeBeforeSigkill;
+    final Random rand = new Random();
+
+    static final String DELAY_BEFORE_KILL_KEY =
+      "mapred.tasktracker.tasks.sleeptime-before-sigkill";
+    // number of milliseconds to wait between TERM and KILL.
+    private static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 250;
 
     public JvmManagerForType(int maxJvms, boolean isMap, 
         TaskTracker tracker) {
       this.maxJvms = maxJvms;
       this.isMap = isMap;
       this.tracker = tracker;
+      sleeptimeBeforeSigkill =
+        tracker.getJobConf().getLong(DELAY_BEFORE_KILL_KEY,
+                                     DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
     }
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -215,36 +224,20 @@ class JvmManager {
     
     synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
         throws IOException {
-      if (jvmToRunningTask.containsKey(jvmId)) {
-        //Incase of JVM reuse, tasks are returned to previously launched
-        //JVM via this method. However when a new task is launched
-        //the task being returned has to be initialized.
-        TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
-        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
-        Task task = taskRunner.getTaskInProgress().getTask();
-
-        // Initialize task dirs
-        TaskControllerContext context =
-            new TaskController.TaskControllerContext();
-        context.env = jvmRunner.env;
-        context.task = task;
-        // If we are returning the same task as which the JVM was launched
-        // we don't initialize task once again.
-        if (!jvmRunner.env.conf.get(JobContext.TASK_ATTEMPT_ID).equals(
-            task.getTaskID().toString())) {
-          try {
-            tracker.getTaskController().initializeTask(context);
-          } catch (IOException e) {
-            LOG.warn("Failed to initialize the new task "
-                + task.getTaskID().toString() + " to be given to JVM with id "
-                + jvmId);
-            throw e;
-          }
-        }
-
-        return taskRunner.getTaskInProgress();
-      }
-      return null;
+      final TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+      return null == taskRunner ? null : taskRunner.getTaskInProgress();
+      //if (jvmToRunningTask.containsKey(jvmId)) {
+      //  //Incase of JVM reuse, tasks are returned to previously launched
+      //  //JVM via this method. However when a new task is launched
+      //  //the task being returned has to be initialized.
+      //  TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+      //  // TODO retained for MAPREDUCE-1100
+      //  JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+      //  Task task = taskRunner.getTaskInProgress().getTask();
+
+      //  return taskRunner.getTaskInProgress();
+      //}
+      //return null;
     }
     
     synchronized public boolean isJvmknown(JVMId jvmId) {
@@ -262,7 +255,9 @@ class JvmManager {
       }
     }
 
-    synchronized public void taskKilled(TaskRunner tr) {
+    synchronized public void taskKilled(TaskRunner tr
+                                        ) throws IOException,
+                                                 InterruptedException {
       JVMId jvmId = runningTaskToJvm.remove(tr);
       if (jvmId != null) {
         jvmToRunningTask.remove(jvmId);
@@ -270,29 +265,22 @@ class JvmManager {
       }
     }
 
-    synchronized public void killJvm(JVMId jvmId) {
+    synchronized public void killJvm(JVMId jvmId)
+        throws IOException, InterruptedException {
       JvmRunner jvmRunner;
       if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
         killJvmRunner(jvmRunner);
       }
     }
     
-    private synchronized void killJvmRunner(JvmRunner jvmRunner) {
+    private synchronized void killJvmRunner(JvmRunner jvmRunner)
+        throws IOException, InterruptedException {
       jvmRunner.kill();
       removeJvm(jvmRunner.jvmId);
     }
 
-    synchronized void dumpStack(TaskRunner tr) {
-      JVMId jvmId = runningTaskToJvm.get(tr);
-      if (null != jvmId) {
-        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
-        if (null != jvmRunner) {
-          jvmRunner.dumpChildStacks();
-        }
-      }
-    }
-
-    synchronized public void stop() {
+    synchronized public void stop()
+        throws IOException, InterruptedException {
       //since the kill() method invoked later on would remove
       //an entry from the jvmIdToRunner map, we create a
       //copy of the values and iterate over it (if we don't
@@ -310,7 +298,7 @@ class JvmManager {
       jvmIdToPid.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        TaskRunner t, JvmEnv env) {
+        TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
       if (t.getTaskInProgress().wasKilled()) {
         //the task was killed in-flight
         //no need to do the rest of the operations
@@ -399,7 +387,7 @@ class JvmManager {
 
     private void spawnNewJvm(JobID jobId, JvmEnv env,  
         TaskRunner t) {
-      JvmRunner jvmRunner = new JvmRunner(env,jobId);
+      JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
       //spawn the JVM in a new thread. Note that there will be very little
       //extra overhead of launching the new thread for a new JVM since
@@ -435,71 +423,89 @@ class JvmManager {
       JVMId jvmId;
       volatile boolean busy = true;
       private ShellCommandExecutor shexec; // shell terminal for running the task
-      //context used for starting JVM
-      private TaskControllerContext initalContext;
+      private Task firstTask;
       
-      public JvmRunner(JvmEnv env, JobID jobId) {
+      public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
         this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
+        this.firstTask = firstTask;
         LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
       }
+
+      @Override
       public void run() {
-        runChild(env);
+        try {
+          runChild(env);
+        } catch (InterruptedException ie) {
+          return;
+        } catch (IOException e) {
+          LOG.warn("Caught IOException in JVMRunner", e);
+        } catch (Throwable e) {
+          LOG.error("Caught Throwable in JVMRunner. Aborting TaskTracker.", e);
+          System.exit(1);
+        } finally {
+          // TODO MR-1100
+          //jvmFinished();
+        }
       }
 
-      public void runChild(JvmEnv env) {
-        initalContext = new TaskControllerContext();
+      public void runChild(JvmEnv env)
+          throws IOException, InterruptedException {
+        int exitCode = 0;
         try {
           env.vargs.add(Integer.toString(jvmId.getId()));
-          //Launch the task controller to run task JVM
-          initalContext.task = jvmToRunningTask.get(jvmId).getTask();
-          initalContext.env = env;
-          tracker.getTaskController().launchTaskJVM(initalContext);
+          TaskRunner runner = jvmToRunningTask.get(jvmId);
+          if (runner != null) {
+            Task task = runner.getTask();
+            //Launch the task controller to run task JVM
+            String user = task.getUser();
+            TaskAttemptID taskAttemptId = task.getTaskID();
+            String taskAttemptIdStr = task.isTaskCleanupTask() ? 
+                (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :
+                  taskAttemptId.toString(); 
+                exitCode = tracker.getTaskController().launchTask(user,
+                    jvmId.jobId.toString(), taskAttemptIdStr, env.setup,
+                    env.vargs, env.workDir, env.stdout.toString(),
+                    env.stderr.toString());
+          }
         } catch (IOException ioe) {
           // do nothing
           // error and output are appropriately redirected
         } finally { // handle the exit code
-          shexec = initalContext.shExec;
-          if (shexec == null) {
-            return;
-          }
-
+          // although the process has exited before we get here,
+          // make sure the entire process group has also been killed.
           kill();
 
-          int exitCode = shexec.getExitCode();
           updateOnJvmExit(jvmId, exitCode);
           LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
               + ". Number of tasks it ran: " + numTasksRan);
-          try {
-            // In case of jvm-reuse,
-            //the task jvm cleans up the common workdir for every 
-            //task at the beginning of each task in the task JVM.
-            //For the last task, we do it here.
-            if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              deleteWorkDir(tracker, initalContext.task);
-            }
-          } catch (IOException ie){}
+          deleteWorkDir(tracker, firstTask);
         }
       }
 
       /** 
-       * Kills the process. Also kills its subprocesses if the process(root of subtree
-       * of processes) is created using setsid.
+       * Kills the process. Also kills its subprocesses if the process(root of
+       * subtree of processes) is created using setsid.
        */
-      synchronized void kill() {
+      synchronized void kill() throws IOException, InterruptedException {
         if (!killed) {
           TaskController controller = tracker.getTaskController();
           // Check inital context before issuing a kill to prevent situations
           // where kill is issued before task is launched.
-          if (initalContext != null && initalContext.env != null) {
-            initalContext.pid = jvmIdToPid.get(jvmId);
-            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
-                .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
-                    ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-            // Destroy the task jvm
-            controller.destroyTaskJVM(initalContext);
+          String pidStr = jvmIdToPid.get(jvmId);
+          if (pidStr != null) {
+            String user = env.conf.getUser();
+            int pid = Integer.parseInt(pidStr);
+            // start a thread that will kill the process dead
+            if (sleeptimeBeforeSigkill > 0) {
+              controller.signalTask(user, pid, Signal.QUIT);
+              controller.signalTask(user, pid, Signal.TERM);
+              new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill,
+                  Signal.KILL, tracker.getTaskController()).start();
+            } else {
+              controller.signalTask(user, pid, Signal.KILL);
+            }
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId
                 .toString()));
@@ -508,37 +514,6 @@ class JvmManager {
         }
       }
 
-      /** Send a signal to the JVM requesting that it dump a stack trace,
-       * and wait for a timeout interval to give this signal time to be
-       * processed.
-       */
-      void dumpChildStacks() {
-        if (!killed) {
-          TaskController controller = tracker.getTaskController();
-          // Check inital context before issuing a signal to prevent situations
-          // where signal is issued before task is launched.
-          if (initalContext != null && initalContext.env != null) {
-            initalContext.pid = jvmIdToPid.get(jvmId);
-            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
-                .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
-                    ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-            // signal the task jvm
-            controller.dumpTaskStack(initalContext);
-
-            // We're going to kill the jvm with SIGKILL after this,
-            // so we should wait for a few seconds first to ensure that
-            // the SIGQUIT has time to be processed.
-            try {
-              Thread.sleep(initalContext.sleeptimeBeforeSigkill);
-            } catch (InterruptedException e) {
-              LOG.warn("Sleep interrupted : " +
-                  StringUtils.stringifyException(e));
-            }
-          }
-        }
-      }
-
       public void taskRan() {
         busy = false;
         numTasksRan++;
@@ -565,15 +540,13 @@ class JvmManager {
     JobConf conf;
     Map<String, String> env;
 
-    public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
-        File stderr, long logSize, File workDir, Map<String,String> env,
-        JobConf conf) {
+    public JvmEnv(List <String> setup, Vector<String> vargs, File stdout, 
+        File stderr, long logSize, File workDir, JobConf conf) {
       this.setup = setup;
       this.vargs = vargs;
       this.stdout = stdout;
       this.stderr = stderr;
       this.workDir = workDir;
-      this.env = env;
       this.conf = conf;
     }
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Mar  8 05:56:27 2011
@@ -14,30 +14,27 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A {@link TaskController} that runs the task JVMs as the user 
@@ -48,8 +45,8 @@ import org.apache.hadoop.util.Shell.Shel
  * JVM and killing it when needed, and also initializing and
  * finalizing the task environment. 
  * <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller mapreduce.job.user.name command command-args, where</p>
- * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
  * <p>command is one of the cardinal value of the 
  * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
  * <p>command-args depends on the command being launched.</p>
@@ -62,52 +59,78 @@ class LinuxTaskController extends TaskCo
 
   private static final Log LOG = 
             LogFactory.getLog(LinuxTaskController.class);
-
-  // Name of the executable script that will contain the child
-  // JVM command line. See writeCommand for details.
-  private static final String COMMAND_FILE = "taskjvm.sh";
   
   // Path to the setuid executable.
-  private static String taskControllerExe;
+  private String taskControllerExe;
+  private static final String TASK_CONTROLLER_EXEC_KEY =
+    "mapreduce.tasktracker.task-controller.exe";
   
-  static {
-    // the task-controller is expected to be under the $HADOOP_HOME/bin
-    // directory.
-    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
-    taskControllerExe = 
-        new File(hadoopBin, "task-controller").getAbsolutePath();
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    taskControllerExe = getTaskControllerExecutablePath(conf);
   }
-  
+
   public LinuxTaskController() {
     super();
   }
-  
+
+  protected String getTaskControllerExecutablePath(Configuration conf) {
+    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+    String defaultTaskController =
+      new File(hadoopBin, "task-controller").getAbsolutePath();
+    return null == conf
+      ? defaultTaskController
+      : conf.get(TASK_CONTROLLER_EXEC_KEY, defaultTaskController);
+  }
+
   /**
    * List of commands that the setuid script will execute.
    */
-  enum TaskControllerCommands {
-    INITIALIZE_USER,
-    INITIALIZE_JOB,
-    INITIALIZE_DISTRIBUTEDCACHE_FILE,
-    LAUNCH_TASK_JVM,
-    INITIALIZE_TASK,
-    TERMINATE_TASK_JVM,
-    KILL_TASK_JVM,
-    RUN_DEBUG_SCRIPT,
-    SIGQUIT_TASK_JVM,
-    ENABLE_TASK_FOR_CLEANUP,
-    ENABLE_JOB_FOR_CLEANUP
+  enum Commands {
+    INITIALIZE_JOB(0),
+    LAUNCH_TASK_JVM(1),
+    SIGNAL_TASK(2),
+    DELETE_AS_USER(3),
+    DELETE_LOG_AS_USER(4);
+
+    private int value;
+    Commands(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
+  }
+
+  /**
+   * Result codes returned from the C task-controller.
+   * These must match the values in task-controller.h.
+   */
+  enum ResultCode {
+    OK(0),
+    INVALID_USER_NAME(2),
+    INVALID_TASK_PID(9),
+    INVALID_TASKCONTROLLER_PERMISSIONS(22),
+    INVALID_CONFIG_FILE(24);
+
+    private final int value;
+    ResultCode(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
   }
 
   @Override
-  public void setup() throws IOException {
-    super.setup();
-    
+  public void setup(LocalDirAllocator allocator) throws IOException {
+
     // Check the permissions of the task-controller binary by running it plainly.
-    // If permissions are correct, it returns an error code 1, else it returns 
+    // If permissions are correct, it returns an error code 1, else it returns
     // 24 or something else if some other bugs are also present.
     String[] taskControllerCmd =
-        new String[] { getTaskControllerExecutablePath() };
+        new String[] { taskControllerExe };
     ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
     try {
       shExec.execute();
@@ -120,52 +143,96 @@ class LinuxTaskController extends TaskCo
           + "permissions/ownership with exit code " + exitCode, e);
       }
     }
+    this.allocator = allocator;
+  }
+  
+
+  @Override
+  public void initializeJob(String user, String jobid, Path credentials,
+                            Path jobConf, TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
+                            ) throws IOException, InterruptedException {
+    List<String> command = new ArrayList<String>(
+      Arrays.asList(taskControllerExe, 
+                    user, 
+                    Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+                    jobid,
+                    credentials.toUri().getPath().toString(),
+                    jobConf.toUri().getPath().toString()));
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    command.add(jvm.toString());
+    command.add("-classpath");
+    command.add(System.getProperty("java.class.path"));
+    command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+    command.add("-Dhadoop.root.logger=INFO,console");
+    command.add(JobLocalizer.class.getName());  // main of JobLocalizer
+    command.add(user);
+    command.add(jobid);
+    // add the task tracker's reporting address
+    command.add(ttAddr.getHostName());
+    command.add(Integer.toString(ttAddr.getPort()));
+    String[] commandArray = command.toArray(new String[0]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("initializeJob: " + Arrays.toString(commandArray));
+    }
+    try {
+      shExec.execute();
+      if (LOG.isDebugEnabled()) {
+        logOutput(shExec.getOutput());
+      }
+    } catch (ExitCodeException e) {
+      int exitCode = shExec.getExitCode();
+      logOutput(shExec.getOutput());
+      throw new IOException("Job initialization failed (" + exitCode + ")", e);
+    }
   }
 
-  /**
-   * Launch a task JVM that will run as the owner of the job.
-   * 
-   * This method launches a task JVM by executing a setuid executable that will
-   * switch to the user and run the task. Also does initialization of the first
-   * task in the same setuid process launch.
-   */
   @Override
-  void launchTaskJVM(TaskController.TaskControllerContext context) 
-                                        throws IOException {
-    JvmEnv env = context.env;
-    // get the JVM command line.
-    String cmdLine = 
-      TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true);
-
-    StringBuffer sb = new StringBuffer();
-    //export out all the environment variable before child command as
-    //the setuid/setgid binaries would not be getting, any environmental
-    //variables which begin with LD_*.
-    for(Entry<String, String> entry : env.env.entrySet()) {
-      sb.append("export ");
-      sb.append(entry.getKey());
-      sb.append("=");
-      sb.append(entry.getValue());
-      sb.append("\n");
-    }
-    sb.append(cmdLine);
-    // write the command to a file in the
-    // task specific cache directory
-    writeCommand(sb.toString(), getTaskCacheDirectory(context, 
-        context.env.workDir));
-    
-    // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, 
-        context.env.workDir);
-    ShellCommandExecutor shExec =  buildTaskControllerExecutor(
-                                    TaskControllerCommands.LAUNCH_TASK_JVM, 
-                                    env.conf.getUser(),
-                                    launchTaskJVMArgs, env.workDir, env.env);
-    context.shExec = shExec;
+  public int launchTask(String user, 
+                                  String jobId,
+                                  String attemptId,
+                                  List<String> setup,
+                                  List<String> jvmArguments,
+                                  File currentWorkDirectory,
+                                  String stdout,
+                                  String stderr) throws IOException {
+
+    ShellCommandExecutor shExec = null;
     try {
+      FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+      long logSize = 0; //TODO, Ref BUG:2854624
+      // get the JVM command line.
+      String cmdLine = 
+        TaskLog.buildCommandLine(setup, jvmArguments,
+            new File(stdout), new File(stderr), logSize, true);
+
+      // write the command to a file in the
+      // task specific cache directory
+      Path p = new Path(allocator.getLocalPathForWrite(
+          TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+          getConf()), COMMAND_FILE);
+      String commandFile = writeCommand(cmdLine, rawFs, p); 
+
+      String[] command = 
+        new String[]{taskControllerExe, 
+          user,
+          Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
+          jobId,
+          attemptId,
+          currentWorkDirectory.toString(),
+          commandFile};
+      shExec = new ShellCommandExecutor(command);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("launchTask: " + Arrays.toString(command));
+      }
       shExec.execute();
     } catch (Exception e) {
+      if (shExec == null) {
+        return -1;
+      }
       int exitCode = shExec.getExitCode();
       LOG.warn("Exit code from task is : " + exitCode);
       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
@@ -177,476 +244,79 @@ class LinuxTaskController extends TaskCo
         LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
         logOutput(shExec.getOutput());
       }
-      throw new IOException(e);
+      return exitCode;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+      LOG.debug("Output from LinuxTaskController's launchTask follows:");
       logOutput(shExec.getOutput());
     }
+    return 0;
   }
-  
-  /**
-   * Launch the debug script process that will run as the owner of the job.
-   * 
-   * This method launches the task debug script process by executing a setuid
-   * executable that will switch to the user and run the task. 
-   */
+
   @Override
-  void runDebugScript(DebugScriptContext context) throws IOException {
-    String debugOut = FileUtil.makeShellPath(context.stdout);
-    String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
-    writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
-    // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
-    runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
-        launchTaskJVMArgs, context.workDir, null);
-  }
-  /**
-   * Helper method that runs a LinuxTaskController command
-   * 
-   * @param taskControllerCommand
-   * @param user
-   * @param cmdArgs
-   * @param env
-   * @throws IOException
-   */
-  private void runCommand(TaskControllerCommands taskControllerCommand, 
-      String user, List<String> cmdArgs, File workDir, Map<String, String> env)
-      throws IOException {
-
-    ShellCommandExecutor shExec =
-        buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, 
-                                    workDir, env);
-    try {
-      shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
-          + shExec.getExitCode());
-      LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
-          + StringUtils.stringifyException(e));
-      LOG.info("Output from LinuxTaskController's " 
-               + taskControllerCommand.toString() + " follows:");
-      logOutput(shExec.getOutput());
-      throw new IOException(e);
+  public void deleteAsUser(String user, String subDir, String... baseDirs) 
+  throws IOException {
+    List<String> command = new ArrayList<String>(
+        Arrays.asList(
+                   taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.DELETE_AS_USER.getValue()),
+                   subDir));
+    for (String baseDir : baseDirs) {
+      command.add(baseDir);
     }
+    String[] commandArray = command.toArray(new String[0]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
     if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's " 
-               + taskControllerCommand.toString() + " follows:");
-      logOutput(shExec.getOutput());
-    }
-  }
-
-  /**
-   * Returns list of arguments to be passed while initializing a new task. See
-   * {@code buildTaskControllerExecutor(TaskControllerCommands, String, 
-   * List<String>, JvmEnv)} documentation.
-   * 
-   * @param context
-   * @return Argument to be used while launching Task VM
-   */
-  private List<String> buildInitializeTaskArgs(TaskExecContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    String taskId = context.task.getTaskID().toString();
-    String jobId = getJobId(context);
-    commandArgs.add(jobId);
-    if (!context.task.isTaskCleanupTask()) {
-      commandArgs.add(taskId);
-    } else {
-      commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+      LOG.debug("deleteAsUser: " + Arrays.toString(commandArray));
     }
-    return commandArgs;
+    shExec.execute();
   }
 
   @Override
-  void initializeTask(TaskControllerContext context)
-      throws IOException {
+  public void deleteLogAsUser(String user, String subDir) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
+                   subDir};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " 
-                + TaskControllerCommands.INITIALIZE_TASK.toString()
-                + " for " + context.task.getTaskID().toString());
-    }
-    runCommand(TaskControllerCommands.INITIALIZE_TASK, 
-        context.env.conf.getUser(),
-        buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
-  }
-
-  /**
-   * Builds the args to be passed to task-controller for enabling of task for
-   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
-   */
-  private List<String> buildTaskCleanupArgs(
-      TaskControllerTaskPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.task.getJobID().toString());
-
-    String workDir = "";
-    if (context.isWorkDir) {
-      workDir = "/work";
-    }
-    if (context.task.isTaskCleanupTask()) {
-      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
-                      + workDir);
-    } else {
-      commandArgs.add(context.task.getTaskID() + workDir);
+      LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
     }
-
-    return commandArgs;
+    shExec.execute();
   }
 
-  /**
-   * Builds the args to be passed to task-controller for enabling of job for
-   * cleanup. Last arg in this List is $jobid.
-   */
-  private List<String> buildJobCleanupArgs(
-      TaskControllerJobPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(2);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.jobId.toString());
-
-    return commandArgs;
-  }
-  
-  /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
-  @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (context instanceof TaskControllerTaskPathDeletionContext) {
-      TaskControllerTaskPathDeletionContext tContext =
-        (TaskControllerTaskPathDeletionContext) context;
-      enablePathForCleanup(tContext, 
-                           TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
-                           buildTaskCleanupArgs(tContext));
-    }
-    else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-          + "TaskControllerTaskPathDeletionContext.");
-    }
-  }
-
-  /**
-   * Enables the job for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
   @Override
-  void enableJobForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (context instanceof TaskControllerJobPathDeletionContext) {
-      TaskControllerJobPathDeletionContext tContext =
-        (TaskControllerJobPathDeletionContext) context;
-      enablePathForCleanup(tContext, 
-                           TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
-                           buildJobCleanupArgs(tContext));
-    } else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-                  + "TaskControllerJobPathDeletionContext.");
-    }
-  }
-  
-  /**
-   * Enable a path for cleanup
-   * @param c {@link TaskControllerPathDeletionContext} for the path to be 
-   *          cleaned up
-   * @param command {@link TaskControllerCommands} for task/job cleanup
-   * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable 
-   *                    path cleanup
-   */
-  private void enablePathForCleanup(TaskControllerPathDeletionContext c,
-                                    TaskControllerCommands command,
-                                    List<String> cleanupArgs) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
-    }
-
-    if ( c.user != null && c.fs instanceof LocalFileSystem) {
-      try {
-        runCommand(command, c.user, cleanupArgs, null, null);
-      } catch(IOException e) {
-        LOG.warn("Unable to change permissions for " + c.fullPath);
-      }
-    }
-    else {
-      throw new IllegalArgumentException("Either user is null or the " 
-                  + "file system is not local file system.");
-    }
-  }
-
-  private void logOutput(String output) {
-    String shExecOutput = output;
-    if (shExecOutput != null) {
-      for (String str : shExecOutput.split("\n")) {
-        LOG.info(str);
-      }
-    }
-  }
-
-  private String getJobId(TaskExecContext context) {
-    String taskId = context.task.getTaskID().toString();
-    TaskAttemptID tId = TaskAttemptID.forName(taskId);
-    String jobId = tId.getJobID().toString();
-    return jobId;
-  }
-
-  /**
-   * Returns list of arguments to be passed while launching task VM.
-   * See {@code buildTaskControllerExecutor(TaskControllerCommands, 
-   * String, List<String>, JvmEnv)} documentation.
-   * @param context
-   * @return Argument to be used while launching Task VM
-   */
-  private List<String> buildLaunchTaskArgs(TaskExecContext context, 
-      File workDir) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context, workDir));
-    LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context, workDir)), 
-        context) );
-    commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context, workDir)), 
-        context));
-    commandArgs.addAll(buildInitializeTaskArgs(context));
-    return commandArgs;
-  }
-
-  // Get the directory from the list of directories configured
-  // in Configs.LOCAL_DIR chosen for storing data pertaining to
-  // this task.
-  private String getDirectoryChosenForTask(File directory,
-      TaskExecContext context) {
-    String jobId = getJobId(context);
-    String taskId = context.task.getTaskID().toString();
-    for (String dir : mapredLocalDirs) {
-      File mapredDir = new File(dir);
-      File taskDir =
-          new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
-              .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
-              .getParentFile();
-      if (directory.equals(taskDir)) {
-        return dir;
-      }
-    }
-
-    LOG.error("Couldn't parse task cache directory correctly");
-    throw new IllegalArgumentException("invalid task cache directory "
-        + directory.getAbsolutePath());
-  }
-
-  /**
-   * Builds the command line for launching/terminating/killing task JVM.
-   * Following is the format for launching/terminating/killing task JVM
-   * <br/>
-   * For launching following is command line argument:
-   * <br/>
-   * {@code mapreduce.job.user.name command tt-root job_id task_id} 
-   * <br/>
-   * For terminating/killing task jvm.
-   * {@code mapreduce.job.user.name command tt-root task-pid}
-   * 
-   * @param command command to be executed.
-   * @param userName mapreduce.job.user.name
-   * @param cmdArgs list of extra arguments
-   * @param workDir working directory for the task-controller
-   * @param env JVM environment variables.
-   * @return {@link ShellCommandExecutor}
-   * @throws IOException
-   */
-  private ShellCommandExecutor buildTaskControllerExecutor(
-      TaskControllerCommands command, String userName, List<String> cmdArgs,
-      File workDir, Map<String, String> env)
-      throws IOException {
-    String[] taskControllerCmd = new String[3 + cmdArgs.size()];
-    taskControllerCmd[0] = getTaskControllerExecutablePath();
-    taskControllerCmd[1] = userName;
-    taskControllerCmd[2] = String.valueOf(command.ordinal());
-    int i = 3;
-    for (String cmdArg : cmdArgs) {
-      taskControllerCmd[i++] = cmdArg;
-    }
+  public boolean signalTask(String user, int taskPid, 
+                         Signal signal) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.SIGNAL_TASK.getValue()),
+                   Integer.toString(taskPid),
+                   Integer.toString(signal.getValue())};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     if (LOG.isDebugEnabled()) {
-      for (String cmd : taskControllerCmd) {
-        LOG.debug("taskctrl command = " + cmd);
-      }
-    }
-    ShellCommandExecutor shExec = null;
-    if(workDir != null && workDir.exists()) {
-      shExec = new ShellCommandExecutor(taskControllerCmd,
-          workDir, env);
-    } else {
-      shExec = new ShellCommandExecutor(taskControllerCmd);
-    }
-    
-    return shExec;
-  }
-  
-  // Return the task specific directory under the cache.
-  private String getTaskCacheDirectory(TaskExecContext context, 
-      File workDir) {
-    // In the case of JVM reuse, the task specific directory
-    // is different from what is set with respect with
-    // env.workDir. Hence building this from the taskId everytime.
-    String taskId = context.task.getTaskID().toString();
-    File cacheDirForJob = workDir.getParentFile().getParentFile();
-    if(context.task.isTaskCleanupTask()) {
-      taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
-    }
-    return new File(cacheDirForJob, taskId).getAbsolutePath(); 
-  }
-  
-  // Write the JVM command line to a file under the specified directory
-  // Note that the JVM will be launched using a setuid executable, and
-  // could potentially contain strings defined by a user. Hence, to
-  // prevent special character attacks, we write the command line to
-  // a file and execute it.
-  private void writeCommand(String cmdLine, 
-                                      String directory) throws IOException {
-    
-    PrintWriter pw = null;
-    String commandFile = directory + File.separator + COMMAND_FILE;
-    LOG.info("Writing commands to " + commandFile);
-    LOG.info("--------Commands Begin--------");
-    LOG.info(cmdLine);
-    LOG.info("--------Commands End--------");
-    try {
-      FileWriter fw = new FileWriter(commandFile);
-      BufferedWriter bw = new BufferedWriter(fw);
-      pw = new PrintWriter(bw);
-      pw.write(cmdLine);
-    } catch (IOException ioe) {
-      LOG.error("Caught IOException while writing JVM command line to file. "
-                + ioe.getMessage());
-    } finally {
-      if (pw != null) {
-        pw.close();
-      }
-      // set execute permissions for all on the file.
-      File f = new File(commandFile);
-      if (f.exists()) {
-        f.setReadable(true, false);
-        f.setExecutable(true, false);
-      }
+      LOG.debug("signalTask: " + Arrays.toString(command));
     }
-  }
-
-  private List<String> buildInitializeJobCommandArgs(
-      JobInitializationContext context) {
-    List<String> initJobCmdArgs = new ArrayList<String>();
-    initJobCmdArgs.add(context.jobid.toString());
-    return initJobCmdArgs;
-  }
-
-  @Override
-  void initializeJob(JobInitializationContext context)
-      throws IOException {
-    LOG.debug("Going to initialize job " + context.jobid.toString()
-        + " on the TT");
-    runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
-        buildInitializeJobCommandArgs(context), context.workDir, null);
-  }
-
-  @Override
-  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to initialize distributed cache for " + context.user
-          + " with localizedBaseDir " + context.localizedBaseDir + 
-          " and uniqueString " + context.uniqueString);
-    }
-    List<String> args = new ArrayList<String>();
-    // Here, uniqueString might start with '-'. Adding -- in front of the 
-    // arguments indicates that they are non-option parameters.
-    args.add("--");
-    args.add(context.localizedBaseDir.toString());
-    args.add(context.uniqueString);
-    runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, 
-        context.user, args, context.workDir, null);
-  }
-
-  @Override
-  public void initializeUser(InitializationContext context)
-      throws IOException {
-    LOG.debug("Going to initialize user directories for " + context.user
-        + " on the TT");
-    runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
-        new ArrayList<String>(), context.workDir, null);
-  }
-
-  /**
-   * API which builds the command line to be pass to LinuxTaskController
-   * binary to terminate/kill the task. See 
-   * {@code buildTaskControllerExecutor(TaskControllerCommands, 
-   * String, List<String>, JvmEnv)} documentation.
-   * 
-   * 
-   * @param context context of task which has to be passed kill signal.
-   * 
-   */
-  private List<String> buildKillTaskCommandArgs(TaskControllerContext 
-      context){
-    List<String> killTaskJVMArgs = new ArrayList<String>();
-    killTaskJVMArgs.add(context.pid);
-    return killTaskJVMArgs;
-  }
-  
-  /**
-   * Convenience method used to sending appropriate signal to the task
-   * VM
-   * @param context
-   * @param command
-   * @throws IOException
-   */
-  protected void signalTask(TaskControllerContext context,
-      TaskControllerCommands command) throws IOException{
-    if(context.task == null) {
-      LOG.info("Context task is null; not signaling the JVM");
-      return;
-    }
-    ShellCommandExecutor shExec = buildTaskControllerExecutor(
-        command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir,
-        context.env.env);
     try {
       shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
-      throw new IOException(e);
-    }
-  }
-  
-  @Override
-  void terminateTask(TaskControllerContext context) {
-    try {
-      signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending kill to the Task VM " + 
-          StringUtils.stringifyException(e));
-    }
-  }
-  
-  @Override
-  void killTask(TaskControllerContext context) {
-    try {
-      signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending destroy to the Task VM " + 
-          StringUtils.stringifyException(e));
+    } catch (ExitCodeException e) {
+      int ret_code = shExec.getExitCode();
+      if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) {
+        return false;
+      }
+      logOutput(shExec.getOutput());
+      throw new IOException("Problem signalling task " + taskPid + " with " +
+                            signal + "; exit = " + ret_code);
     }
+    return true;
   }
 
   @Override
-  void dumpTaskStack(TaskControllerContext context) {
-    try {
-      signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
-          StringUtils.stringifyException(e));
-    }
-  }
-
-  protected String getTaskControllerExecutablePath() {
-    return taskControllerExe;
+  public String getRunAsUser(JobConf conf) {
+    return conf.getUser();
   }
 }
+

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Mar  8 05:56:27 2011
@@ -38,7 +38,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -79,6 +78,7 @@ public class LocalJobRunner implements C
   private AtomicInteger map_tasks = new AtomicInteger(0);
   private int reduce_tasks = 0;
   final Random rand = new Random();
+  private final TaskController taskController = new DefaultTaskController();
   
   private JobTrackerInstrumentation myMetrics = null;
 
@@ -118,7 +118,7 @@ public class LocalJobRunner implements C
     private FileSystem localFs;
     boolean killed = false;
     
-    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TrackerDistributedCacheManager trackerDistributedCacheManager;
     private TaskDistributedCacheManager taskDistributedCacheManager;
 
     public long getProtocolVersion(String protocol, long clientVersion) {
@@ -136,14 +136,12 @@ public class LocalJobRunner implements C
 
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
-      this.trackerDistributerdCacheManager =
-          new TrackerDistributedCacheManager(conf, new DefaultTaskController());
+      this.trackerDistributedCacheManager =
+        new TrackerDistributedCacheManager(conf);
       this.taskDistributedCacheManager = 
-          trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
-      taskDistributedCacheManager.setup(
-          new LocalDirAllocator(MRConfig.LOCAL_DIR), 
-          new File(systemJobDir.toString()),
-          "archive", "archive");
+        trackerDistributedCacheManager.newTaskDistributedCacheManager(
+            jobid, conf);
+      taskDistributedCacheManager.setupCache(conf, "archive", "archive");
       
       if (DistributedCache.getSymlink(conf)) {
         // This is not supported largely because, 
@@ -460,7 +458,7 @@ public class LocalJobRunner implements C
           localFs.delete(localJobFile, true);              // delete local copy
           // Cleanup distributed cache
           taskDistributedCacheManager.release();
-          trackerDistributerdCacheManager.purgeCache();
+          trackerDistributedCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -534,6 +532,14 @@ public class LocalJobRunner implements C
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
     }
+
+    @Override
+    public void updatePrivateDistributedCacheSizes(
+                                                   org.apache.hadoop.mapreduce.JobID jobId,
+                                                   long[] sizes)
+                                                                throws IOException {
+      trackerDistributedCacheManager.setArchiveSizes(jobId, sizes);
+    }
     
     public boolean canCommit(TaskAttemptID taskid) 
     throws IOException {
@@ -580,6 +586,7 @@ public class LocalJobRunner implements C
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
+    taskController.setConf(conf);
   }
 
   // JobSubmissionProtocol methods

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTask.java Tue Mar  8 05:56:27 2011
@@ -155,8 +155,10 @@ class MapTask extends Task {
   
   @Override
   public TaskRunner createRunner(TaskTracker tracker, 
-      TaskTracker.TaskInProgress tip) {
-    return new MapTaskRunner(tip, tracker, this.conf);
+                                 TaskTracker.TaskInProgress tip,
+                                 TaskTracker.RunningJob rjob
+                                 ) throws IOException {
+    return new MapTaskRunner(tip, tracker, this.conf, rjob);
   }
 
   @Override

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Tue Mar  8 05:56:27 2011
@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.log4j.Level;
 
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
   
-  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
-    super(task, tracker, conf);
+  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf,
+      TaskTracker.RunningJob rjob) throws IOException {
+    super(task, tracker, conf, rjob);
   }
   
   @Override

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar  8 05:56:27 2011
@@ -146,9 +146,10 @@ public class ReduceTask extends Task {
   }
 
   @Override
-  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip) 
-  throws IOException {
-    return new ReduceTaskRunner(tip, tracker, this.conf);
+  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+                                 TaskTracker.RunningJob rjob
+                                 ) throws IOException {
+    return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
   }
 
   @Override

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Mar  8 05:56:27 2011
@@ -26,9 +26,10 @@ import org.apache.log4j.Level;
 class ReduceTaskRunner extends TaskRunner {
 
   public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, 
-                          JobConf conf) throws IOException {
+                          JobConf conf, TaskTracker.RunningJob rjob
+                          ) throws IOException {
     
-    super(task, tracker, conf);
+    super(task, tracker, conf, rjob);
   }
 
   public void close() throws IOException {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 05:56:27 2011
@@ -458,7 +458,9 @@ abstract public class Task implements Wr
   /** Return an approprate thread runner for this task. 
    * @param tip TODO*/
   public abstract TaskRunner createRunner(TaskTracker tracker, 
-      TaskTracker.TaskInProgress tip) throws IOException;
+                                          TaskTracker.TaskInProgress tip, 
+                                          TaskTracker.RunningJob rjob
+                                          ) throws IOException;
 
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskController.java Tue Mar  8 05:56:27 2011
@@ -14,11 +14,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -26,15 +28,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.fs.permission.FsPermission;
+
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * Controls initialization, finalization and clean up of tasks, and
@@ -46,392 +45,250 @@ import org.apache.hadoop.classification.
  * performing the actual actions.
  * 
  * <br/>
+ * 
+ * NOTE: This class is internal only class and not intended for users!!
  */
-@InterfaceAudience.Private
 public abstract class TaskController implements Configurable {
   
+  /**
+   * The constants for the signals.
+   */
+  public enum Signal {
+    NULL(0, "NULL"), QUIT(3, "SIGQUIT"), 
+    KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
+    private final int value;
+    private final String str;
+    private Signal(int value, String str) {
+      this.str = str;
+      this.value = value;
+    }
+    public int getValue() {
+      return value;
+    }
+    @Override
+    public String toString() {
+      return str;
+    }
+  }
+
   private Configuration conf;
-  
+
   public static final Log LOG = LogFactory.getLog(TaskController.class);
   
+  //Name of the executable script that will contain the child
+  // JVM command line. See writeCommand for details.
+  protected static final String COMMAND_FILE = "taskjvm.sh";
+  
+  protected LocalDirAllocator allocator;
+
+  final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+  FsPermission.createImmutable((short) 0700); // rwx--------
+  
   public Configuration getConf() {
     return conf;
   }
 
-  // The list of directory paths specified in the variable Configs.LOCAL_DIR
-  // This is used to determine which among the list of directories is picked up
-  // for storing data for a particular task.
-  protected String[] mapredLocalDirs;
-
   public void setConf(Configuration conf) {
     this.conf = conf;
-    mapredLocalDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
   }
 
   /**
-   * Sets up the permissions of the following directories on all the configured
-   * disks:
-   * <ul>
-   * <li>mapreduce.cluster.local.directories</li>
-   * <li>Hadoop log directories</li>
-   * </ul>
+   * Does initialization and setup.
+   * @param allocator the local dir allocator to use
    */
-  public void setup() throws IOException {
-    for (String localDir : this.mapredLocalDirs) {
-      // Set up the mapreduce.cluster.local.directories.
-      File mapredlocalDir = new File(localDir);
-      if (!mapredlocalDir.exists() && !mapredlocalDir.mkdirs()) {
-        LOG.warn("Unable to create mapreduce.cluster.local.directory : "
-            + mapredlocalDir.getPath());
-      } else {
-        Localizer.PermissionsHandler.setPermissions(mapredlocalDir,
-            Localizer.PermissionsHandler.sevenFiveFive);
-      }
-    }
-
-    // Set up the user log directory
-    File taskLog = TaskLog.getUserLogDir();
-    if (!taskLog.exists() && !taskLog.mkdirs()) {
-      LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
-    } else {
-      Localizer.PermissionsHandler.setPermissions(taskLog,
-          Localizer.PermissionsHandler.sevenFiveFive);
-    }
-  }
-
+  public abstract void setup(LocalDirAllocator allocator) throws IOException;
+  
   /**
-   * Take task-controller specific actions to initialize job. This involves
-   * setting appropriate permissions to job-files so as to secure the files to
-   * be accessible only by the user's tasks.
-   * 
+   * Create all of the directories necessary for the job to start and download
+   * all of the job and private distributed cache files.
+   * Creates both the user directories and the job log directory.
+   * @param user the user name
+   * @param jobid the job
+   * @param credentials a filename containing the job secrets
+   * @param jobConf the path to the localized configuration file
+   * @param taskTracker the connection the task tracker
+   * @param ttAddr the tasktracker's RPC address
    * @throws IOException
    */
-  abstract void initializeJob(JobInitializationContext context) throws IOException;
-
+  public abstract void initializeJob(String user, String jobid, 
+                                     Path credentials, Path jobConf,
+                                     TaskUmbilicalProtocol taskTracker,
+                                     InetSocketAddress ttAddr) 
+  throws IOException, InterruptedException;
+  
   /**
-   * Take task-controller specific actions to initialize the distributed cache
-   * file. This involves setting appropriate permissions for these files so as
-   * to secure them to be accessible only their owners.
-   * 
-   * @param context
+   * Create all of the directories for the task and launches the child jvm.
+   * @param user the user name
+   * @param jobId the jobId in question
+   * @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
+   * @param setup list of shell commands to execute before the jvm
+   * @param jvmArguments list of jvm arguments
+   * @param currentWorkDirectory the full path of the cwd for the task
+   * @param stdout the file to redirect stdout to
+   * @param stderr the file to redirect stderr to
+   * @return the exit code for the task
    * @throws IOException
    */
-  public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException;
-
-  /**
-   * Launch a task JVM
-   * 
-   * This method defines how a JVM will be launched to run a task. Each
-   * task-controller should also do an
-   * {@link #initializeTask(TaskControllerContext)} inside this method so as to
-   * initialize the task before launching it. This is for reasons of
-   * task-controller specific optimizations w.r.t combining initialization and
-   * launching of tasks.
-   * 
-   * @param context the context associated to the task
-   */
-  abstract void launchTaskJVM(TaskControllerContext context)
-                                      throws IOException;
-
+  public abstract
+  int launchTask(String user, 
+                 String jobId,
+                 String attemptId,
+                 List<String> setup,
+                 List<String> jvmArguments,
+                 File currentWorkDirectory,
+                 String stdout,
+                 String stderr) throws IOException;
+  
   /**
-   * Top level cleanup a task JVM method.
-   * <ol>
-   * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
-   * to cleanup.</li>
-   * <li>Sends a forceful kill signal to task JVM, terminating all its
-   * sub-processes forcefully.</li>
-   * </ol>
-   *
-   * @param context the task for which kill signal has to be sent.
+   * Send a signal to a task pid as the user. Always signal the process group.
+   * An implementation may elect to signal the pid directly if the former is
+   * unavailable or fails.
+   * @param user the user name
+   * @param taskPid the pid of the task
+   * @param signal the id of the signal to send
+   * @return false if the process does not exist
+   * @throws IOException If the task controller failed to signal the process
+   * (group), but the process exists.
    */
-  final void destroyTaskJVM(TaskControllerContext context) {
-    // Send SIGTERM to try to ask for a polite exit.
-    terminateTask(context);
-
-    try {
-      Thread.sleep(context.sleeptimeBeforeSigkill);
-    } catch (InterruptedException e) {
-      LOG.warn("Sleep interrupted : " +
-          StringUtils.stringifyException(e));
-    }
-
-    killTask(context);
-  }
-
-  /** Perform initializing actions required before a task can run.
-    * 
-    * For instance, this method can be used to setup appropriate
-    * access permissions for files and directories that will be
-    * used by tasks. Tasks use the job cache, log, and distributed cache
-    * directories and files as part of their functioning. Typically,
-    * these files are shared between the daemon and the tasks
-    * themselves. So, a TaskController that is launching tasks
-    * as different users can implement this method to setup
-    * appropriate ownership and permissions for these directories
-    * and files.
-    */
-  abstract void initializeTask(TaskControllerContext context)
-      throws IOException;
-
-  static class TaskExecContext {
-    // task being executed
-    Task task;
-  }
+  public abstract boolean signalTask(String user, int taskPid, 
+                                  Signal signal) throws IOException;
+  
   /**
-   * Contains task information required for the task controller.  
+   * Delete the user's files under all of the task tracker root directories.
+   * @param user the user name
+   * @param subDir the path relative to base directories
+   * @param baseDirs the base directories (absolute paths)
+   * @throws IOException
    */
-  static class TaskControllerContext extends TaskExecContext {
-    ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
-
-    // Information used only when this context is used for launching new tasks.
-    JvmEnv env;     // the JVM environment for the task.
-
-    // Information used only when this context is used for destroying a task jvm.
-    String pid; // process handle of task JVM.
-    long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after sending SIGTERM
-  }
-
+  public abstract void deleteAsUser(String user, 
+                                    String subDir,
+                                    String... baseDirs) throws IOException;
+  
   /**
-   * Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the file/dir
+   * Delete the user's files under the userlogs directory.
+   * @param user the user to work as
+   * @param subDir the path under the userlogs directory.
+   * @throws IOException
    */
-  static abstract class TaskControllerPathDeletionContext 
-  extends PathDeletionContext {
-    TaskController taskController;
-    String user;
-
-    /**
-     * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir, 
-     * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir, 
-     * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId, 
-     * taskId, etc.
-     */
-    Path mapredLocalDir;
-
-    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
-                                             TaskController taskController,
-                                             String user) {
-      super(fs, null);
-      this.taskController = taskController;
-      this.mapredLocalDir = mapredLocalDir;
+  public abstract void deleteLogAsUser(String user, 
+                                       String subDir) throws IOException;
+  
+  static class DeletionContext extends CleanupQueue.PathDeletionContext {
+    private TaskController controller;
+    private boolean isLog;
+    private String user;
+    private String subDir;
+    private String[] baseDirs;
+    DeletionContext(TaskController controller, boolean isLog, String user, 
+                    String subDir, String[] baseDirs) {
+      super(null, null);
+      this.controller = controller;
+      this.isLog = isLog;
       this.user = user;
+      this.subDir = subDir;
+      this.baseDirs = baseDirs;
     }
-
+    
     @Override
-    protected String getPathForCleanup() {
-      if (fullPath == null) {
-        fullPath = buildPathForDeletion();
+    protected void deletePath() throws IOException {
+      if (isLog) {
+        controller.deleteLogAsUser(user, subDir);
+      } else {
+        controller.deleteAsUser(user, subDir, baseDirs);
       }
-      return fullPath;
     }
 
-    /**
-     * Return the component of the path under the {@link #mapredLocalDir} to be 
-     * cleaned up. Its the responsibility of the class that extends 
-     * {@link TaskControllerPathDeletionContext} to provide the correct 
-     * component. For example 
-     *  - For task related cleanups, either the task-work-dir or task-local-dir
-     *    might be returned depending on jvm reuse.
-     *  - For job related cleanup, simply the job-local-dir might be returned.
-     */
-    abstract protected String getPath();
-    
-    /**
-     * Builds the path of taskAttemptDir OR taskWorkDir based on
-     * mapredLocalDir, jobId, taskId, etc
-     */
-    String buildPathForDeletion() {
-      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
+    @Override
+    public String toString() {
+      return (isLog ? "log(" : "dir(") +
+        user + "," + subDir + ")";
     }
   }
-
-  /** Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the task-work-dir or
-   * task-local-dir depending on whether the jvm is reused or not.
-   */
-  static class TaskControllerTaskPathDeletionContext 
-  extends TaskControllerPathDeletionContext {
-    final Task task;
-    final boolean isWorkDir;
-    
-    public TaskControllerTaskPathDeletionContext(FileSystem fs, 
-        Path mapredLocalDir, Task task, boolean isWorkDir, 
-        TaskController taskController) {
-      super(fs, mapredLocalDir, taskController, task.getUser());
-      this.task = task;
-      this.isWorkDir = isWorkDir;
-    }
-    
-    /**
-     * Returns the taskWorkDir or taskLocalDir based on whether 
-     * {@link TaskControllerTaskPathDeletionContext} is configured to delete
-     * the workDir.
-     */
-    @Override
-    protected String getPath() {
-      String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask())
-        : TaskTracker.getLocalTaskDir(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask());
-      return subDir;
+  
+   /**
+    * Returns the local unix user that a given job will run as.
+    */
+   public String getRunAsUser(JobConf conf) {
+     return System.getProperty("user.name");
+   }
+
+  //Write the JVM command line to a file under the specified directory
+  // Note that the JVM will be launched using a setuid executable, and
+  // could potentially contain strings defined by a user. Hence, to
+  // prevent special character attacks, we write the command line to
+  // a file and execute it.
+  protected static String writeCommand(String cmdLine, FileSystem fs,
+      Path commandFile) throws IOException {
+    PrintWriter pw = null;
+    LOG.info("Writing commands to " + commandFile);
+    try {
+      pw = new PrintWriter(FileSystem.create(
+            fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
+      pw.write(cmdLine);
+    } catch (IOException ioe) {
+      LOG.error("Caught IOException while writing JVM command line to file. ",
+          ioe);
+    } finally {
+      if (pw != null) {
+        pw.close();
+      }
     }
-
-    /**
-     * Makes the path(and its subdirectories recursively) fully deletable by
-     * setting proper permissions(770) by task-controller
-     */
-    @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath, if not inited already
-      if (fs.exists(new Path(fullPath))) {
-        taskController.enableTaskForCleanup(this);
+    return commandFile.makeQualified(fs).toUri().getPath();
+  }
+  
+  protected void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
       }
     }
   }
 
-  /** Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the job-local-dir.
-   */
-  static class TaskControllerJobPathDeletionContext 
-  extends TaskControllerPathDeletionContext {
-    final JobID jobId;
-    
-    public TaskControllerJobPathDeletionContext(FileSystem fs, 
-        Path mapredLocalDir, JobID id, String user, 
+  public static final boolean isSetsidAvailable = isSetsidSupported();
+  private static boolean isSetsidSupported() {
+    ShellCommandExecutor shexec = null;
+    boolean setsidSupported = true;
+    try {
+      String[] args = {"setsid", "bash", "-c", "echo $$"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("setsid is not available on this machine. So not using it.");
+      setsidSupported = false;
+    } finally { // handle the exit code
+      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+    }
+    return setsidSupported;
+  }
+
+  public static class DelayedProcessKiller extends Thread {
+    private final String user;
+    private final int pid;
+    private final long delay;
+    private final Signal signal;
+    private final TaskController taskController;
+    public DelayedProcessKiller(String user, int pid, long delay, Signal signal,
         TaskController taskController) {
-      super(fs, mapredLocalDir, taskController, user);
-      this.jobId = id;
-    }
-    
-    /**
-     * Returns the jobLocalDir of the job to be cleaned up.
-     */
-    @Override
-    protected String getPath() {
-      return TaskTracker.getLocalJobDir(user, jobId.toString());
+      this.user = user;
+      this.pid = pid;
+      this.delay = delay;
+      this.signal = signal;
+      this.taskController = taskController;
+      setName("Task killer for " + pid);
+      setDaemon(false);
     }
-    
-    /**
-     * Makes the path(and its sub-directories recursively) fully deletable by
-     * setting proper permissions(770) by task-controller
-     */
     @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath, if not inited already
-      if (fs.exists(new Path(fullPath))) {
-        taskController.enableJobForCleanup(this);
+    public void run() {
+      try {
+        Thread.sleep(delay);
+        taskController.signalTask(user, pid, signal);
+      } catch (InterruptedException e) {
+        return;
+      } catch (IOException e) {
+        LOG.warn("Exception when killing task " + pid, e);
       }
     }
   }
-  
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class InitializationContext {
-    public File workDir;
-    public String user;
-    
-    public InitializationContext() {
-    }
-    
-    public InitializationContext(String user, File workDir) {
-      this.user = user;
-      this.workDir = workDir;
-    }
-  }
-  
-  /**
-   * This is used for initializing the private localized files in distributed
-   * cache. Initialization would involve changing permission, ownership and etc.
-   */
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class DistributedCacheFileContext extends InitializationContext {
-    // base directory under which file has been localized
-    Path localizedBaseDir;
-    // the unique string used to construct the localized path
-    String uniqueString;
-
-    public DistributedCacheFileContext(String user, File workDir,
-        Path localizedBaseDir, String uniqueString) {
-      super(user, workDir);
-      this.localizedBaseDir = localizedBaseDir;
-      this.uniqueString = uniqueString;
-    }
-
-    public Path getLocalizedUniqueDir() {
-      return new Path(localizedBaseDir, new Path(TaskTracker
-          .getPrivateDistributedCacheDir(user), uniqueString));
-    }
-  }
-
-  static class JobInitializationContext extends InitializationContext {
-    JobID jobid;
-  }
-  
-  static class DebugScriptContext extends TaskExecContext {
-    List<String> args;
-    File workDir;
-    File stdout;
-  }
-
-  /**
-   * Sends a graceful terminate signal to taskJVM and it sub-processes. 
-   *   
-   * @param context task context
-   */
-  abstract void terminateTask(TaskControllerContext context);
-  
-  /**
-   * Sends a KILL signal to forcefully terminate the taskJVM and its
-   * sub-processes.
-   * 
-   * @param context task context
-   */
-  abstract void killTask(TaskControllerContext context);
-
-
-  /**
-   * Sends a QUIT signal to direct the task JVM (and sub-processes) to
-   * dump their stack to stdout.
-   *
-   * @param context task context.
-   */
-  abstract void dumpTaskStack(TaskControllerContext context);
 
-  /**
-   * Initialize user on this TaskTracer in a TaskController specific manner.
-   * 
-   * @param context
-   * @throws IOException
-   */
-  public abstract void initializeUser(InitializationContext context)
-      throws IOException;
-  
-  /**
-   * Launch the task debug script
-   * 
-   * @param context
-   * @throws IOException
-   */
-  abstract void runDebugScript(DebugScriptContext context) 
-      throws IOException;
-  
-  /**
-   * Enable the task for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException;
-  
-  /**
-   * Enable the job for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableJobForCleanup(PathDeletionContext context)
-    throws IOException;
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskLog.java Tue Mar  8 05:56:27 2011
@@ -40,8 +40,8 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -152,7 +152,14 @@ public class TaskLog {
 
   static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
     String cleanupSuffix = isCleanup ? ".cleanup" : "";
-    return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
+    return getAttemptDir(taskid.getJobID().toString(), 
+        taskid.toString() + cleanupSuffix);
+  }
+  
+  static File getAttemptDir(String jobid, String taskid) {
+    // taskid should be fully formed and it should have the optional 
+    // .cleanup suffix
+    return new File(getJobDir(jobid), taskid);
   }
   private static long prevOutLength;
   private static long prevErrLength;
@@ -468,21 +475,23 @@ public class TaskLog {
     
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);    
-    StringBuffer mergedCmd = new StringBuffer();
+    StringBuilder mergedCmd = new StringBuilder();
     
     // Export the pid of taskJvm to env variable JVM_PID.
     // Currently pid is not used on Windows
     if (!Shell.WINDOWS) {
-      mergedCmd.append(" export JVM_PID=`echo $$` ; ");
+      mergedCmd.append("export JVM_PID=`echo $$` ; ");
     }
 
-    if (setup != null && setup.size() > 0) {
-      mergedCmd.append(addCommand(setup, false));
-      mergedCmd.append(";");
+    if (setup != null) {
+      for (String s : setup) {
+        mergedCmd.append(s);
+        mergedCmd.append("\n");
+      }
     }
     if (tailLength > 0) {
       mergedCmd.append("(");
-    } else if(ProcessTree.isSetsidAvailable && useSetsid &&
+    } else if(TaskController.isSetsidAvailable && useSetsid &&
         !Shell.WINDOWS) {
       mergedCmd.append("exec setsid ");
     } else {
@@ -555,7 +564,7 @@ public class TaskLog {
    */
   public static String addCommand(List<String> cmd, boolean isExecutable) 
   throws IOException {
-    StringBuffer command = new StringBuffer();
+    StringBuilder command = new StringBuilder();
     for(String s: cmd) {
     	command.append('\'');
       if (isExecutable) {
@@ -604,11 +613,21 @@ public class TaskLog {
   /**
    * Get the user log directory for the job jobid.
    * 
-   * @param jobid
+   * @param jobid string representation of the jobid
+   * @return user log directory for the job
+   */
+  public static File getJobDir(String jobid) {
+    return new File(getUserLogDir(), jobid);
+  }
+  
+  /**
+   * Get the user log directory for the job jobid.
+   * 
+   * @param jobid the jobid object
    * @return user log directory for the job
    */
   public static File getJobDir(JobID jobid) {
-    return new File(getUserLogDir(), jobid.toString());
+    return getJobDir(jobid.toString());
   }
 
 } // TaskLog