You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC

svn commit: r1079211 [9/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/fairscheduler/designdoc/ src/contrib/streaming/s...

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Tue Mar  8 05:56:27 2011
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.TaskTrac
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -46,14 +45,13 @@ class TaskMemoryManagerThread extends Th
   private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
 
   private TaskTracker taskTracker;
-  private long monitoringInterval;
-
-  private long maxMemoryAllowedForAllTasks;
   private long maxRssMemoryAllowedForAllTasks;
 
-  private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
-  private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
-  private List<TaskAttemptID> tasksToBeRemoved;
+  private final long monitoringInterval;
+  private final long maxMemoryAllowedForAllTasks;
+  private final Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
+  private final Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
+  private final List<TaskAttemptID> tasksToBeRemoved;
 
   private static final String MEMORY_USAGE_STRING =
     "Memory usage of ProcessTree %s for task-id %s : Virutal %d bytes, " +
@@ -91,7 +89,8 @@ class TaskMemoryManagerThread extends Th
     this.monitoringInterval = monitoringInterval;
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit, long memLimitPhysical) {
+  public void addTask(TaskAttemptID tid, long memLimit,
+      long memLimitPhysical) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
       ProcessTreeInfo ptInfo =
@@ -204,19 +203,10 @@ class TaskMemoryManagerThread extends Th
             if (pId != null) {
               // pId will be null, either if the JVM is not spawned yet or if
               // the JVM is removed from jvmIdToPid
-              long sleeptimeBeforeSigkill =
-                  taskTracker
-                      .getJobConf()
-                      .getLong(
-                          TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
-                          ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-              // create process tree object
-              ProcfsBasedProcessTree pt =
-                  new ProcfsBasedProcessTree(pId,
-                      ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
               LOG.debug("Tracking ProcessTree " + pId + " for the first time");
 
+              ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId,
+                  TaskController.isSetsidAvailable);
               ptInfo.setPid(pId);
               ptInfo.setProcessTree(pt);
             }
@@ -284,9 +274,13 @@ class TaskMemoryManagerThread extends Th
             // Virtual or physical memory over limit. Fail the task and remove
             // the corresponding process tree
             LOG.warn(msg);
+            // warn if not a leader
+            if (!pTree.checkPidPgrpidForMatch()) {
+              LOG.error("Killed task process with PID " + pId +
+                  " but it is not a process group leader.");
+            }
+            // kill the task
             taskTracker.cleanUpOverMemoryTask(tid, true, msg);
-            // Now destroy the ProcessTree, remove it from monitoring map.
-            pTree.destroy(true/*in the background*/);
             it.remove();
             LOG.info("Removed ProcessTree with root " + pId);
           } else {
@@ -298,8 +292,7 @@ class TaskMemoryManagerThread extends Th
         } catch (Exception e) {
           // Log the exception and proceed to the next task.
           LOG.warn("Uncaught exception in TaskMemoryManager "
-              + "while managing memory of " + tid + " : "
-              + StringUtils.stringifyException(e));
+              + "while managing memory of " + tid, e);
         }
       }
 
@@ -528,8 +521,6 @@ class TaskMemoryManagerThread extends Th
     taskTracker.cleanUpOverMemoryTask(tid, false, msg);
     // Now destroy the ProcessTree, remove it from monitoring map.
     ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
-    ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
-    pTree.destroy(true/*in the background*/);
     processTreeInfoMap.remove(tid);
     LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Mar  8 05:56:27 2011
@@ -20,16 +20,16 @@ package org.apache.hadoop.mapred;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Vector;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,21 +67,30 @@ abstract class TaskRunner extends Thread
   private boolean exitCodeSet = false;
   
   private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+  static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
 
   
   private TaskTracker tracker;
   private TaskDistributedCacheManager taskDistributedCacheManager;
+  private String[] localdirs;
+  final private static Random rand;
+  static {
+    rand = new Random();
+  }
 
   protected JobConf conf;
   JvmManager jvmManager;
 
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
-      JobConf conf) {
+                    JobConf conf, TaskTracker.RunningJob rjob
+                    ) throws IOException {
     this.tip = tip;
     this.t = tip.getTask();
     this.tracker = tracker;
     this.conf = conf;
     this.jvmManager = tracker.getJvmManagerInstance();
+    this.localdirs = conf.getLocalDirs();
+    taskDistributedCacheManager = rjob.distCacheMgr;
   }
 
   public Task getTask() { return t; }
@@ -155,26 +164,13 @@ abstract class TaskRunner extends Thread
       //all the archives
       TaskAttemptID taskid = t.getTaskID();
       final LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
-      final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
-      // We don't create any symlinks yet, so presence/absence of workDir
-      // actually on the file system doesn't matter.
-      tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws IOException {
-          taskDistributedCacheManager = 
-            tracker.getTrackerDistributedCacheManager()
-            .newTaskDistributedCacheManager(conf);
-          taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
-              .getPrivateDistributedCacheDir(conf.getUser()), 
-          TaskTracker.getPublicDistributedCacheDir());
-          return null;
-        }
-      });
-
-      // Set up the child task's configuration. After this call, no localization
-      // of files should happen in the TaskTracker's process space. Any changes to
-      // the conf object after this will NOT be reflected to the child.
-      setupChildTaskConfiguration(lDirAlloc);
+    //simply get the location of the workDir and pass it to the child. The
+    //child will do the actual dir creation
+    final File workDir =
+     new File(new Path(localdirs[rand.nextInt(localdirs.length)],
+         TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
+         taskid.toString(),
+         t.isTaskCleanupTask())).toString());
 
       // Build classpath
       List<String> classPaths =
@@ -189,7 +185,7 @@ abstract class TaskRunner extends Thread
       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
-      List<String> setup = getVMSetupCmd();
+      String setup = getVMSetupCmd();
 
       // Set up the redirection of the task's stdout and stderr streams
       File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
@@ -202,7 +198,20 @@ abstract class TaskRunner extends Thread
       errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
                                    taskid, logSize);
 
-      launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
+      // flatten the env as a set of export commands
+      List <String> setupCmds = new ArrayList<String>();
+      for(Entry<String, String> entry : env.entrySet()) {
+        StringBuffer sb = new StringBuffer();
+        sb.append("export ");
+        sb.append(entry.getKey());
+        sb.append("=\"");
+        sb.append(entry.getValue());
+        sb.append("\"");
+        setupCmds.add(sb.toString());
+      }
+      setupCmds.add(setup);
+
+      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
       if (exitCodeSet) {
         if (!killed && exitCode != 0) {
@@ -231,14 +240,6 @@ abstract class TaskRunner extends Thread
         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
       }
     } finally {
-      try{
-        if (taskDistributedCacheManager != null) {
-          taskDistributedCacheManager.release();
-        }
-      }catch(IOException ie){
-        LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
-      }
-
       // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
       // *false* since the task has either
       // a) SUCCEEDED - which means commit has been done
@@ -247,11 +248,11 @@ abstract class TaskRunner extends Thread
     }
   }
 
-  void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
-      File stderr, long logSize, File workDir, Map<String, String> env)
-      throws InterruptedException {
+  void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
+      File stderr, long logSize, File workDir)
+      throws InterruptedException, IOException {
     jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
-        stderr, logSize, workDir, env, conf));
+        stderr, logSize, workDir, conf));
     synchronized (lock) {
       while (!done) {
         lock.wait();
@@ -302,7 +303,7 @@ abstract class TaskRunner extends Thread
                 .isTaskCleanupTask()), conf);
 
     // write the child's task configuration file to the local disk
-    writeLocalTaskFile(localTaskFile.toString(), conf);
+    JobLocalizer.writeLocalJobFile(localTaskFile, conf);
 
     // Set the final job file in the task. The child needs to know the correct
     // path to job.xml. So set this path accordingly.
@@ -312,16 +313,21 @@ abstract class TaskRunner extends Thread
   /**
    * @return
    */
-  private List<String> getVMSetupCmd() {
-    String[] ulimitCmd = Shell.getUlimitMemoryCommand(getChildUlimit(conf));
-    List<String> setup = null;
-    if (ulimitCmd != null) {
-      setup = new ArrayList<String>();
-      for (String arg : ulimitCmd) {
-        setup.add(arg);
-      }
+  private String getVMSetupCmd() {
+    final int ulimit = getChildUlimit(conf);
+    if (ulimit <= 0) {
+      return "";
+    }
+    String setup[] = Shell.getUlimitMemoryCommand(ulimit);
+    StringBuilder command = new StringBuilder();
+    for (String str : setup) {
+      command.append('\'');
+      command.append(str);
+      command.append('\'');
+      command.append(" ");
     }
-    return setup;
+    command.append("\n");
+    return command.toString();
   }
 
   /**
@@ -404,7 +410,7 @@ abstract class TaskRunner extends Thread
       vargs.add(javaOptsSplit[i]);
     }
 
-    Path childTmpDir = createChildTmpDir(workDir, conf);
+    Path childTmpDir = createChildTmpDir(workDir, conf, false);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);
 
     // Add classpath.
@@ -453,7 +459,7 @@ abstract class TaskRunner extends Thread
    * @throws IOException
    */
   static Path createChildTmpDir(File workDir,
-      JobConf conf)
+      JobConf conf, boolean createDir)
       throws IOException {
 
     // add java.io.tmpdir given by mapreduce.task.tmp.dir
@@ -463,10 +469,13 @@ abstract class TaskRunner extends Thread
     // if temp directory path is not absolute, prepend it with workDir.
     if (!tmpDir.isAbsolute()) {
       tmpDir = new Path(workDir.toString(), tmp);
-
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && localFs.getFileStatus(tmpDir).isFile()) {
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      if (createDir) {
+        FileSystem localFs = FileSystem.getLocal(conf);
+        if (!localFs.mkdirs(tmpDir) && 
+            !localFs.getFileStatus(tmpDir).isDir()) {
+          throw new IOException("Mkdirs failed to create " +
+              tmpDir.toString());
+        }
       }
     }
     return tmpDir;
@@ -513,6 +522,7 @@ abstract class TaskRunner extends Thread
       ldLibraryPath.append(oldLdLibraryPath);
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+    env.put(HADOOP_WORK_DIR, workDir.toString());
     
     // put jobTokenFile name into env
     String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
@@ -573,25 +583,6 @@ abstract class TaskRunner extends Thread
   }
 
   /**
-   * Write the task specific job-configuration file.
-   * 
-   * @param localFs
-   * @throws IOException
-   */
-  private static void writeLocalTaskFile(String jobFile, JobConf conf)
-      throws IOException {
-    Path localTaskFile = new Path(jobFile);
-    FileSystem localFs = FileSystem.getLocal(conf);
-    localFs.delete(localTaskFile, true);
-    OutputStream out = localFs.create(localTaskFile);
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  /**
    * Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
    * Whenever it uses LocalDirAllocator from now on inside the child, it will
    * only see files inside the attempt-directory. This is done in the Child's
@@ -615,15 +606,11 @@ abstract class TaskRunner extends Thread
   }
 
   /** Creates the working directory pathname for a task attempt. */ 
-  static File formWorkDir(LocalDirAllocator lDirAlloc, 
-      TaskAttemptID task, boolean isCleanup, JobConf conf) 
+  static Path formWorkDir(LocalDirAllocator lDirAlloc, JobConf conf) 
       throws IOException {
     Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
-            conf.getUser(), task.getJobID().toString(), task.toString(),
-            isCleanup), conf);
-
-    return new File(workDir.toString());
+        lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+    return workDir;
   }
 
   private static void appendSystemClasspaths(List<String> classPaths) {
@@ -715,7 +702,7 @@ abstract class TaskRunner extends Thread
       }
     }
 
-    createChildTmpDir(workDir, conf);
+    createChildTmpDir(workDir, conf, true);
   }
 
   /**
@@ -739,8 +726,10 @@ abstract class TaskRunner extends Thread
 
   /**
    * Kill the child process
+   * @throws InterruptedException 
+   * @throws IOException 
    */
-  public void kill() {
+  public void kill() throws IOException, InterruptedException {
     killed = true;
     jvmManager.taskKilled(this);
     signalDone();

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar  8 05:56:27 2011
@@ -65,31 +65,27 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerTaskPathDeletionContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerJobPathDeletionContext;
+import org.apache.hadoop.mapred.TaskController.DeletionContext;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -98,24 +94,26 @@ import org.apache.hadoop.metrics2.lib.Me
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import static org.apache.hadoop.metrics2.impl.MsInfo.*;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.mapreduce.util.ConfigUtil;
-import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin;
-import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
+import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
 
 /*******************************************************
  * TaskTracker is a process that starts and tracks MR Tasks
@@ -164,14 +162,15 @@ public class TaskTracker 
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
-  // Job ACLs file is created by TaskTracker under userlogs/$jobid directory for
-  // each job at job localization time. This will be used by TaskLogServlet for
-  // authorizing viewing of task logs of that job
+  //Job ACLs file is created by TaskController under userlogs/$jobid directory
+  //for each job at job localization time. This will be used by TaskLogServlet 
+  //for authorizing viewing of task logs of that job
   static String jobACLsFile = "job-acls.xml";
 
   volatile boolean running = true;
 
   private LocalDirAllocator localDirAllocator;
+  private String[] localdirs;
   String taskTrackerName;
   String localHostname;
   InetSocketAddress jobTrackAddr;
@@ -241,11 +240,12 @@ public class TaskTracker 
   static final String DISTCACHEDIR = "distcache";
   static final String JOBCACHE = "jobcache";
   static final String OUTPUT = "output";
-  private static final String JARSDIR = "jars";
+  static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.dta";
   static final String LOCAL_SPLIT_META_FILE = "split.info";
   static final String JOBFILE = "job.xml";
   static final String JOB_TOKEN_FILE="jobToken"; //localized file
+  static final String TT_PRIVATE_DIR = "ttprivate";
 
   static final String JOB_LOCAL_DIR = MRJobConfig.JOB_LOCAL_DIR;
 
@@ -423,7 +423,6 @@ public class TaskTracker 
       RunningJob rJob = null;
       if (!runningJobs.containsKey(jobId)) {
         rJob = new RunningJob(jobId);
-        rJob.localized = false;
         rJob.tasks = new HashSet<TaskInProgress>();
         runningJobs.put(jobId, rJob);
       } else {
@@ -432,7 +431,6 @@ public class TaskTracker 
       synchronized (rJob) {
         rJob.tasks.add(tip);
       }
-      runningJobs.notify(); //notify the fetcher thread
       return rJob;
     }
   }
@@ -490,22 +488,32 @@ public class TaskTracker 
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
   
+  static String getPrivateDirJobConfFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+  }
+  
   static String getLocalJobTokenFile(String user, String jobid) {
-    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR +
+      TaskTracker.JOB_TOKEN_FILE;
   }
 
-
   static String getTaskConfFile(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
     return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
         + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
 
+ static String getPrivateDirTaskScriptLocation(String user, String jobid,
+     String taskid) {
+   return TT_PRIVATE_DIR + Path.SEPARATOR + 
+          getLocalTaskDir(user, jobid, taskid);
+ }
+
   static String getJobJarsDir(String user, String jobid) {
     return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
   }
 
-  static String getJobJarFile(String user, String jobid) {
+  public static String getJobJarFile(String user, String jobid) {
     return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
   }
 
@@ -529,7 +537,8 @@ public class TaskTracker 
         + TaskTracker.OUTPUT;
   }
 
-  static String getLocalTaskDir(String user, String jobid, String taskid) {
+  public static String getLocalTaskDir(String user, String jobid, 
+      String taskid) {
     return getLocalTaskDir(user, jobid, taskid, false);
   }
 
@@ -547,6 +556,19 @@ public class TaskTracker 
     String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
     return dir + Path.SEPARATOR + MRConstants.WORKDIR;
   }
+  
+  static String getPrivateDirJobTokenFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + 
+           getLocalJobTokenFile(user, jobid); 
+  }
+  
+  static String getPrivateDirForJob(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+  }
+  
+  String[] getLocalDirs() {
+    return localdirs;
+  }
 
   String getPid(TaskAttemptID tid) {
     TaskInProgress tip = tasks.get(tid);
@@ -565,7 +587,48 @@ public class TaskTracker 
                             protocol);
     }
   }
-    
+
+  /**
+   * Delete all of the user directories.
+   * @param conf the TT configuration
+   * @throws IOException
+   */
+  private void deleteUserDirectories(Configuration conf) throws IOException {
+    for(String root: localdirs) {
+      for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+        String owner = status.getOwner();
+        String path = status.getPath().getName();
+        if (path.equals(owner)) {
+          taskController.deleteAsUser(owner, "");
+        }
+      }
+    }
+  }
+  
+  public void cleanupAllVolumes() throws IOException {
+    for (int v = 0; v < localdirs.length; v++) {
+      // List all files inside the volumes
+      FileStatus[] files = localFs.listStatus(new Path(localdirs[v]));
+      for (int f = 0; f < files.length; f++) {
+        if (files[f].getPath().getName().equals(SUBDIR)) {
+          FileStatus[] userDirs = 
+            localFs.listStatus(new Path(localdirs[v] +Path.SEPARATOR+ SUBDIR));
+          for (int k = 0; k < userDirs.length; k++) {
+            // Get the relative file name to the root of the volume
+            String absoluteFilename = files[f].getPath().toUri().getPath();
+            getAsyncDiskService().moveAndDeleteRelativePath(localdirs[v], 
+                                                            absoluteFilename);
+          }
+          //          // Do not delete the current TOBEDELETED
+          //          if (!TOBEDELETED.equals(relative)) {
+          //            moveAndDeleteRelativePath(volumes[v], relative);
+          //          }
+        }
+      }
+      getAsyncDiskService().moveAndDeleteFromEachVolume(TT_PRIVATE_DIR);
+    }
+  }
+
   /**
    * Do the real constructor work here.  It's in a separate method
    * so we can call it again and "recycle" the object after calling
@@ -592,9 +655,18 @@ public class TaskTracker 
  
     // Check local disk, start async disk service, and clean up all 
     // local directories.
-    checkLocalDirs(this.fConf.getLocalDirs());
-    setAsyncDiskService(new MRAsyncDiskService(fConf));
-    getAsyncDiskService().cleanupAllVolumes();
+    checkLocalDirs((localdirs = this.fConf.getLocalDirs()));
+    setAsyncDiskService(new MRAsyncDiskService(localFs, 
+                            taskController, fConf.getLocalDirs()));
+    cleanupAllVolumes();
+    final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+    }
+    final FsPermission priv = FsPermission.createImmutable((short) 0700);
+    for (String s : localdirs) {
+      localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+    }
 
     // Clear out state tables
     this.tasks.clear();
@@ -652,15 +724,6 @@ public class TaskTracker 
     this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
     LOG.info("Starting tracker " + taskTrackerName);
 
-    Class<? extends TaskController> taskControllerClass = fConf.getClass(
-        TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class);
-    taskController = (TaskController) ReflectionUtils.newInstance(
-        taskControllerClass, fConf);
-
-
-    // setup and create jobcache directory with appropriate permissions
-    taskController.setup();
-
     // Initialize DistributedCache
     this.distributedCacheManager = 
         new TrackerDistributedCacheManager(this.fConf, taskController,
@@ -705,7 +768,7 @@ public class TaskTracker 
     reduceLauncher.start();
 
     // create a localizer instance
-    setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+    setLocalizer(new Localizer(localFs, fConf.getLocalDirs()));
 
     //Start up node health checker service.
     if (shouldStartHealthMonitor(this.fConf)) {
@@ -801,6 +864,9 @@ public class TaskTracker 
       List <FetchStatus> fList = new ArrayList<FetchStatus>();
       for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
         RunningJob rjob = item.getValue();
+        if (!rjob.localized) {
+          continue;
+        }
         JobID jobId = item.getKey();
         FetchStatus f;
         synchronized (rjob) {
@@ -974,33 +1040,29 @@ public class TaskTracker 
     Task t = tip.getTask();
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
-
-    // Initialize the user directories if needed.
-    getLocalizer().initializeUserDirs(t.getUser());
+    InetSocketAddress ttAddr = getTaskTrackerReportAddress();
 
     synchronized (rjob) {
       if (!rjob.localized) {
-       
-        JobConf localJobConf = localizeJobFiles(t, rjob);
-        // initialize job log directory
-        initializeJobLogDir(jobId, localJobConf);
-
-        // Now initialize the job via task-controller so as to set
-        // ownership/permissions of jars, job-work-dir. Note that initializeJob
-        // should be the last call after every other directory/file to be
-        // directly under the job directory is created.
-        JobInitializationContext context = new JobInitializationContext();
-        context.jobid = jobId;
-        context.user = t.getUser();
-        context.workDir = new File(localJobConf.get(JOB_LOCAL_DIR));
-        taskController.initializeJob(context);
-
+        Path localJobConfPath = initializeJob(t, rjob, ttAddr);
+        JobConf localJobConf = new JobConf(localJobConfPath);
+        // to be doubly sure, overwrite the user in the config with the one the
+        // TT thinks it is
+        localJobConf.setUser(t.getUser());
+        //also reset the #tasks per jvm
+        resetNumTasksPerJvm(localJobConf);
+        //set the base jobconf path in rjob; all tasks will use
+        //this as the base path when they run
+        rjob.localizedJobConf = localJobConfPath;
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
         rjob.localized = true;
       }
     }
+    synchronized (runningJobs) {
+      runningJobs.notify(); //notify the fetcher thread
+    }
     return rjob;
   }
 
@@ -1019,31 +1081,32 @@ public class TaskTracker 
    * Localize the job on this tasktracker. Specifically
    * <ul>
    * <li>Cleanup and create job directories on all disks</li>
+   * <li>Download the credentials file</li>
    * <li>Download the job config file job.xml from the FS</li>
-   * <li>Create the job work directory and set {@link TaskTracker#JOB_LOCAL_DIR}
-   * in the configuration.
-   * <li>Download the job jar file job.jar from the FS, unjar it and set jar
-   * file in the configuration.</li>
+   * <li>Invokes the {@link TaskController} to do the rest of the job 
+   * initialization</li>
    * </ul>
-   * 
+   *
    * @param t task whose job has to be localized on this TT
-   * @return the modified job configuration to be used for all the tasks of this
-   *         job as a starting point.
+   * @param rjob the {@link RunningJob}
+   * @param ttAddr the tasktracker's RPC address
+   * @return the path to the job configuration to be used for all the tasks
+   *         of this job as a starting point.
    * @throws IOException
    */
-  JobConf localizeJobFiles(Task t, RunningJob rjob)
-      throws IOException, InterruptedException {
-    JobID jobId = t.getJobID();
-    String userName = t.getUser();
-
-    // Initialize the job directories
-    FileSystem localFs = FileSystem.getLocal(fConf);
-    getLocalizer().initializeJobDirs(userName, jobId);
+  Path initializeJob(final Task t, final RunningJob rjob, 
+      final InetSocketAddress ttAddr)
+  throws IOException, InterruptedException {
+    final JobID jobId = t.getJobID();
+
+    final Path jobFile = new Path(t.getJobFile());
+    final String userName = t.getUser();
+    final Configuration conf = getJobConf();
     // save local copy of JobToken file
-    String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+    final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
 
-    Credentials ts = TokenCache.loadTokens(localJobTokenFile, fConf);
+    Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
     Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
@@ -1051,38 +1114,87 @@ public class TaskTracker 
     for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
       rjob.ugi.addToken(token);
     }
+    FileSystem userFs = getFS(jobFile, jobId, conf);
     // Download the job.xml for this job from the system FS
-    Path localJobFile =
-        localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
+    final Path localJobFile =
+      localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
 
-    JobConf localJobConf = new JobConf(localJobFile);
-    //WE WILL TRUST THE USERNAME THAT WE GOT FROM THE JOBTRACKER
-    //AS PART OF THE TASK OBJECT
-    localJobConf.setUser(userName);
-    
-    // set the location of the token file into jobConf to transfer 
-    // the name to TaskRunner
-    localJobConf.set(TokenCache.JOB_TOKENS_FILENAME,
-        localJobTokenFile.toString());
-    
-
-    // create the 'job-work' directory: job-specific shared directory for use as
-    // scratch space by all tasks of the same job running on this TaskTracker. 
-    Path workDir =
-        lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId
-            .toString()), fConf);
-    if (!localFs.mkdirs(workDir)) {
-      throw new IOException("Mkdirs failed to create " 
-                  + workDir.toString());
-    }
-    System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath());
-    localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
-    // Download the job.jar for this job from the system FS
-    localizeJobJarFile(userName, jobId, localFs, localJobConf);
-    
-    return localJobConf;
+    /**
+      * Now initialize the job via task-controller to do the rest of the
+      * job-init. Do this within a doAs since the public distributed cache 
+      * is also set up here.
+      * To support potential authenticated HDFS accesses, we need the tokens
+      */
+    rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws IOException, InterruptedException {
+        try {
+          final JobConf localJobConf = new JobConf(localJobFile);
+          // Setup the public distributed cache
+          TaskDistributedCacheManager taskDistributedCacheManager =
+            getTrackerDistributedCacheManager()
+           .newTaskDistributedCacheManager(jobId, localJobConf);
+          rjob.distCacheMgr = taskDistributedCacheManager;
+          taskDistributedCacheManager.setupCache(localJobConf,
+            TaskTracker.getPublicDistributedCacheDir(),
+            TaskTracker.getPrivateDistributedCacheDir(userName));
+
+          // Set some config values
+          localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+              getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+          if (conf.get("slave.host.name") != null) {
+            localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+          }
+          resetNumTasksPerJvm(localJobConf);
+          localJobConf.setUser(t.getUser());
+
+          // write back the config (this config will have the updates that the
+          // distributed cache manager makes as well)
+          JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+          taskController.initializeJob(t.getUser(), jobId.toString(), 
+              new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+              ttAddr);
+        } catch (IOException e) {
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(e));
+          throw e;
+        } catch (InterruptedException ie) {
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(ie));
+          throw ie;
+        }
+        return null;
+      }
+    });
+    //search for the conf that the initializeJob created
+    //need to look up certain configs from this conf, like
+    //the distributed cache, profiling, etc. ones
+    Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+           userName, jobId.toString()), getJobConf());
+    return initializedConf;
+  }
+
+  /** If certain configs are enabled, then jvm-reuse should be disabled
+   * @param localJobConf
+   */
+  static void resetNumTasksPerJvm(JobConf localJobConf) {
+    boolean debugEnabled = false;
+    if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+      return;
+    }
+    if (localJobConf.getMapDebugScript() != null ||
+        localJobConf.getReduceDebugScript() != null) {
+      debugEnabled = true;
+    }
+    String keepPattern = localJobConf.getKeepTaskFilesPattern();
+
+    if (debugEnabled || localJobConf.getProfileEnabled() ||
+        keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+      //disable jvm reuse
+      localJobConf.setNumTasksToExecutePerJvm(1);
+    }
   }
 
+
   // Create job userlog dir.
   // Create job acls file in job log dir, if needed.
   void initializeJobLogDir(JobID jobId, JobConf localJobConf)
@@ -1146,15 +1258,16 @@ public class TaskTracker 
   /**
    * Download the job configuration file from the FS.
    * 
-   * @param t Task whose job file has to be downloaded
-   * @param jobId jobid of the task
+   * @param jobFile the original location of the configuration file
+   * @param user the user in question
+   * @param userFs the FileSystem created on behalf of the user
+   * @param jobId jobid in question
    * @return the local file system path of the downloaded file.
    * @throws IOException
    */
-  private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
-      throws IOException, InterruptedException {
+  private Path localizeJobConfFile(Path jobFile, String user, 
+      FileSystem userFs, JobID jobId) throws IOException {
     final JobConf conf = new JobConf(getJobConf());
-    FileSystem userFs = getFS(jobFile, jobId, conf);
     // Get sizes of JobFile
     // sizes are -1 if they are not present.
     FileStatus status = null;
@@ -1166,69 +1279,25 @@ public class TaskTracker 
       jobFileSize = -1;
     }
 
-    Path localJobFile =
-        lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()),
-            jobFileSize, fConf);
+    Path localJobFile = lDirAlloc.getLocalPathForWrite(
+        getPrivateDirJobConfFile(user, jobId.toString()), jobFileSize, fConf);
 
     // Download job.xml
     userFs.copyToLocalFile(jobFile, localJobFile);
     return localJobFile;
   }
 
-  /**
-   * Download the job jar file from FS to the local file system and unjar it.
-   * Set the local jar file in the passed configuration.
-   * 
-   * @param jobId
-   * @param localFs
-   * @param localJobConf
-   * @throws IOException
-   */
-  private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs,
-      JobConf localJobConf)
-      throws IOException, InterruptedException {
-    // copy Jar file to the local FS and unjar it.
-    String jarFile = localJobConf.getJar();
-    FileStatus status = null;
-    long jarFileSize = -1;
-    if (jarFile != null) {
-      Path jarFilePath = new Path(jarFile);
-      FileSystem fs = getFS(jarFilePath, jobId, localJobConf);
-      try {
-        status = fs.getFileStatus(jarFilePath);
-        jarFileSize = status.getLen();
-      } catch (FileNotFoundException fe) {
-        jarFileSize = -1;
-      }
-      // Here we check for five times the size of jarFileSize to accommodate for
-      // unjarring the jar file in the jars directory
-      Path localJarFile =
-          lDirAlloc.getLocalPathForWrite(
-              getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
-
-      // Download job.jar
-      fs.copyToLocalFile(jarFilePath, localJarFile);
-
-      localJobConf.setJar(localJarFile.toString());
-
-      // Un-jar the parts of the job.jar that need to be added to the classpath
-      RunJar.unJar(
-        new File(localJarFile.toString()),
-        new File(localJarFile.getParent().toString()),
-        localJobConf.getJarUnpackPattern());
-    }
-  }
-
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
-      UserGroupInformation ugi) throws IOException {
+        RunningJob rjob) throws IOException {
     synchronized (tip) {
       tip.setJobConf(jobConf);
-      tip.setUGI(ugi);
-      tip.launchTask();
+      tip.setUGI(rjob.ugi);
+      tip.launchTask(rjob);
     }
   }
     
-  public synchronized void shutdown() throws IOException {
+  public synchronized void shutdown()
+      throws IOException, InterruptedException  {
     shuttingDown = true;
     close();
     if (this.server != null) {
@@ -1245,8 +1314,9 @@ public class TaskTracker 
    * any running tasks or threads, and cleanup disk space.  A new TaskTracker
    * within the same process space might be restarted, so everything must be
    * clean.
+   * @throws InterruptedException 
    */
-  public synchronized void close() throws IOException {
+  public synchronized void close() throws IOException, InterruptedException {
     //
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
@@ -1353,8 +1423,14 @@ public class TaskTracker 
     server.start();
     this.httpPort = server.getPort();
     checkJettyPort(httpPort);
+    Class<? extends TaskController> taskControllerClass =
+       conf.getClass("mapred.task.tracker.task-controller",
+                      DefaultTaskController.class, TaskController.class);
+    taskController =
+      (TaskController) ReflectionUtils.newInstance(taskControllerClass, conf);
+    taskController.setup(localDirAllocator);
     // create task log cleanup thread
-    setTaskLogCleanupThread(new UserLogCleaner(fConf));
+    setTaskLogCleanupThread(new UserLogCleaner(fConf, taskController));
 
     UserGroupInformation.setConfiguration(fConf);
     SecurityUtil.login(fConf, TTConfig.TT_KEYTAB_FILE, TTConfig.TT_USER_NAME);
@@ -1374,7 +1450,7 @@ public class TaskTracker 
   private void startCleanupThreads() throws IOException {
     taskCleanupThread.setDaemon(true);
     taskCleanupThread.start();
-    directoryCleanupThread = new CleanupQueue();
+    directoryCleanupThread = CleanupQueue.getInstance();
     // start tasklog cleanup thread
     taskLogCleanupThread.setDaemon(true);
     taskLogCleanupThread.start();
@@ -1855,7 +1931,6 @@ public class TaskTracker 
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
           myInstrumentation.timedoutTask(tip.getTask().getTaskID());
-          dumpTaskStack(tip);
           purgeTask(tip, true);
         }
       }
@@ -1863,86 +1938,6 @@ public class TaskTracker 
   }
 
   /**
-   * Builds list of PathDeletionContext objects for the given paths
-   */
-  private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs,
-      Path[] paths) {
-    int i = 0;
-    PathDeletionContext[] contexts = new PathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath());
-    }
-    return contexts;
-  }
-
-  /**
-   * Builds list of {@link TaskControllerJobPathDeletionContext} objects for a 
-   * job each pointing to the job's jobLocalDir.
-   * @param fs    : FileSystem in which the dirs to be deleted
-   * @param paths : mapred-local-dirs
-   * @param id    : {@link JobID} of the job for which the local-dir needs to 
-   *                be cleaned up.
-   * @param user  : Job owner's username
-   * @param taskController : the task-controller to be used for deletion of
-   *                         jobLocalDir
-   */
-  static PathDeletionContext[] buildTaskControllerJobPathDeletionContexts(
-      FileSystem fs, Path[] paths, JobID id, String user,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerJobPathDeletionContext(fs, p, id, user,
-                                                               taskController);
-    }
-    return contexts;
-  } 
-  
-  /**
-   * Builds list of TaskControllerTaskPathDeletionContext objects for a task
-   * @param fs    : FileSystem in which the dirs to be deleted
-   * @param paths : mapred-local-dirs
-   * @param task  : the task whose taskDir or taskWorkDir is going to be deleted
-   * @param isWorkDir : the dir to be deleted is workDir or taskDir
-   * @param taskController : the task-controller to be used for deletion of
-   *                         taskDir or taskWorkDir
-   */
-  static PathDeletionContext[] buildTaskControllerTaskPathDeletionContexts(
-      FileSystem fs, Path[] paths, Task task, boolean isWorkDir,
-      TaskController taskController)
-      throws IOException {
-    int i = 0;
-    PathDeletionContext[] contexts =
-                          new TaskControllerPathDeletionContext[paths.length];
-
-    for (Path p : paths) {
-      contexts[i++] = new TaskControllerTaskPathDeletionContext(fs, p, task,
-                          isWorkDir, taskController);
-    }
-    return contexts;
-  }
-
-  /**
-   * Send a signal to a stuck task commanding it to dump stack traces
-   * to stderr before we kill it with purgeTask().
-   *
-   * @param tip {@link TaskInProgress} to dump stack traces.
-   */
-  private void dumpTaskStack(TaskInProgress tip) {
-    TaskRunner runner = tip.getTaskRunner();
-    if (null == runner) {
-      return; // tip is already abandoned.
-    }
-
-    JvmManager jvmMgr = runner.getJvmManager();
-    jvmMgr.dumpStack(runner);
-  }
-
-  /**
    * The task tracker is done with this job, so we need to clean up.
    * @param action The action with the job
    * @throws IOException
@@ -1958,7 +1953,12 @@ public class TaskTracker 
     if (rjob == null) {
       LOG.warn("Unknown job " + jobId + " being deleted.");
     } else {
-      synchronized (rjob) {            
+      synchronized (rjob) {
+        // decrement the reference counts for the items this job references
+        rjob.distCacheMgr.release();
+        // inform the cache manager that the job is done
+        getTrackerDistributedCacheManager()
+            .deleteTaskDistributedCacheManager(jobId);
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
           tip.jobHasFinished(false);
@@ -1970,7 +1970,7 @@ public class TaskTracker 
         // Delete the job directory for this  
         // task if the job is done/failed
         if (!rjob.keepJobFiles) {
-          removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID());
+          removeJobFiles(rjob.ugi.getShortUserName(), rjob.getJobID());
         }
         // add job to taskLogCleanupThread
         long now = System.currentTimeMillis();
@@ -1998,15 +1998,25 @@ public class TaskTracker 
   /**
    * This job's files are no longer needed on this TT, remove them.
    * 
-   * @param rjob
+   * @param user User who ran the job
+   * @param jobId Remove local work dirs for this job
    * @throws IOException
    */
-  void removeJobFiles(String user, JobID jobId)
-      throws IOException {
-    PathDeletionContext[] contexts = 
-      buildTaskControllerJobPathDeletionContexts(localFs, 
-          getLocalFiles(fConf, ""), jobId, user, taskController);
-    directoryCleanupThread.addToQueue(contexts);
+  void removeJobFiles(String user, JobID jobId) throws IOException {
+    String jobDir = getLocalJobDir(user, jobId.toString());
+    PathDeletionContext jobCleanup = 
+      new TaskController.DeletionContext(getTaskController(), false, user, 
+                                         jobDir,
+                                         localdirs);
+    directoryCleanupThread.addToQueue(jobCleanup);
+    
+    for (String str : localdirs) {
+      Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+        new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+      PathDeletionContext ttPrivateJobCleanup =
+        new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+      directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+    }
   }
 
   /**
@@ -2311,12 +2321,14 @@ public class TaskTracker 
    * Start a new task.
    * All exceptions are handled locally, so that we don't mess up the
    * task tracker.
+   * @throws InterruptedException 
    */
-  void startNewTask(TaskInProgress tip) {
+  void startNewTask(TaskInProgress tip) throws InterruptedException {
     try {
       RunningJob rjob = localizeJob(tip);
+      tip.getTask().setJobFile(rjob.localizedJobConf.toString());
       // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
-      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob.ugi); 
+      launchTaskForJob(tip, new JobConf(rjob.jobConf), rjob); 
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -2326,8 +2338,9 @@ public class TaskTracker 
         tip.kill(true);
         tip.cleanup(true);
       } catch (IOException ie2) {
-        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
-                 StringUtils.stringifyException(ie2));          
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+      } catch (InterruptedException ie2) {
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
       }
         
       // Careful! 
@@ -2448,7 +2461,7 @@ public class TaskTracker 
     private TaskRunner runner;
     volatile boolean done = false;
     volatile boolean wasKilled = false;
-    private JobConf defaultJobConf;
+    private JobConf ttConf;
     private JobConf localJobConf;
     private boolean keepFailedTaskFiles;
     private boolean alwaysKeepTaskFiles;
@@ -2480,7 +2493,7 @@ public class TaskTracker 
       this.task = task;
       this.launcher = launcher;
       this.lastProgressReport = System.currentTimeMillis();
-      this.defaultJobConf = conf;
+      this.ttConf = conf;
       localJobConf = null;
       if (task.isUberTask()) {
         taskStatus = new UberTaskStatus(
@@ -2504,66 +2517,9 @@ public class TaskTracker 
     }
 
     void localizeTask(Task task) throws IOException{
-
-      FileSystem localFs = FileSystem.getLocal(fConf);
-
-      // create taskDirs on all the disks.
-      getLocalizer().initializeAttemptDirs(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask());
-
-      // create the working-directory of the task 
-      Path cwd =
-          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
-              .getJobID().toString(), task.getTaskID().toString(), task
-              .isTaskCleanupTask()), defaultJobConf);
-      if (!localFs.mkdirs(cwd)) {
-        throw new IOException("Mkdirs failed to create " 
-                    + cwd.toString());
-      }
-
-      localJobConf.set(LOCAL_DIR,
-                       fConf.get(LOCAL_DIR));
-
-      if (fConf.get(TT_HOST_NAME) != null) {
-        localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME));
-      }
-            
-      keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-
       // Do the task-type specific localization
+      //TODO: are these calls really required
       task.localizeConfiguration(localJobConf);
-      
-      List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
-      if (staticResolutions != null && staticResolutions.size() > 0) {
-        StringBuffer str = new StringBuffer();
-
-        for (int i = 0; i < staticResolutions.size(); i++) {
-          String[] hostToResolved = staticResolutions.get(i);
-          str.append(hostToResolved[0]+"="+hostToResolved[1]);
-          if (i != staticResolutions.size() - 1) {
-            str.append(',');
-          }
-        }
-        localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString());
-      }
-      if (task.isMapTask()) {
-        debugCommand = localJobConf.getMapDebugScript();
-      } else {
-        debugCommand = localJobConf.getReduceDebugScript();
-      }
-      String keepPattern = localJobConf.getKeepTaskFilesPattern();
-      if (keepPattern != null) {
-        alwaysKeepTaskFiles = 
-          Pattern.matches(keepPattern, task.getTaskID().toString());
-      } else {
-        alwaysKeepTaskFiles = false;
-      }
-      if (debugCommand != null || localJobConf.getProfileEnabled() ||
-          alwaysKeepTaskFiles || keepFailedTaskFiles) {
-        //disable jvm reuse
-        localJobConf.setNumTasksToExecutePerJvm(1);
-      }
       task.setConf(localJobConf);
     }
         
@@ -2586,6 +2542,18 @@ public class TaskTracker 
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
       taskTimeout = localJobConf.getLong(MRJobConfig.TASK_TIMEOUT, 
                                          10 * 60 * 1000);
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {
+        debugCommand = localJobConf.getReduceDebugScript();
+      }
+      String keepPattern = localJobConf.getKeepTaskFilesPattern();
+      if (keepPattern != null) {
+        alwaysKeepTaskFiles = 
+          Pattern.matches(keepPattern, task.getTaskID().toString());
+      } else {
+        alwaysKeepTaskFiles = false;
+      }
     }
         
     public synchronized JobConf getJobConf() {
@@ -2606,7 +2574,7 @@ public class TaskTracker 
     /**
      * Kick off the task execution
      */
-    public synchronized void launchTask() throws IOException {
+    public synchronized void launchTask(RunningJob rjob) throws IOException {
       if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
           this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
           this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
@@ -2614,7 +2582,7 @@ public class TaskTracker 
         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
           this.taskStatus.setRunState(TaskStatus.State.RUNNING);
         }
-        setTaskRunner(task.createRunner(TaskTracker.this, this));
+        setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
@@ -2902,13 +2870,14 @@ public class TaskTracker 
       vargs.add(taskSyslog);
       vargs.add(jobConf);
       vargs.add(program);
-      DebugScriptContext context = 
-        new TaskController.DebugScriptContext();
-      context.args = vargs;
-      context.stdout = stdout;
-      context.workDir = workDir;
-      context.task = task;
-      getTaskController().runDebugScript(context);
+      // TODO need to fix debug script
+      //DebugScriptContext context = 
+      //  new TaskController.DebugScriptContext();
+      //context.args = vargs;
+      //context.stdout = stdout;
+      //context.workDir = workDir;
+      //context.task = task;
+      //getTaskController().runDebugScript(context);
       // add the lines of debug out to diagnostics
       int num = localJobConf.getInt(MRJobConfig.TASK_DEBUGOUT_LINES, -1);
       addDiagnostics(FileUtil.makeShellPath(stdout), num, "DEBUG OUT");
@@ -2989,7 +2958,12 @@ public class TaskTracker 
             getRunState() == TaskStatus.State.UNASSIGNED ||
             getRunState() == TaskStatus.State.COMMIT_PENDING ||
             isCleaningup()) {
-          kill(wasFailure);
+          try {
+            kill(wasFailure);
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted while killing " +
+                getTask().getTaskID(), e);
+          }
         }
       }
       
@@ -3001,8 +2975,10 @@ public class TaskTracker 
      * Something went wrong and the task must be killed.
      * 
      * @param wasFailure was it a failure (versus a kill request)?
+     * @throws InterruptedException 
      */
-    public synchronized void kill(boolean wasFailure) throws IOException {
+    public synchronized void kill(boolean wasFailure
+                                  ) throws IOException, InterruptedException {
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
           taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
           isCleaningup()) {
@@ -3106,7 +3082,7 @@ public class TaskTracker 
           return;
         }
         try {
-          removeTaskFiles(needCleanup, taskId);
+          removeTaskFiles(needCleanup);
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: "
               + StringUtils.stringifyException(ie));
@@ -3118,47 +3094,27 @@ public class TaskTracker 
      * Some or all of the files from this task are no longer required. Remove
      * them via CleanupQueue.
      * 
-     * @param needCleanup
+     * @param removeOutputs remove outputs as well as output
      * @param taskId
      * @throws IOException 
      */
-    void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
-        throws IOException {
-      if (needCleanup) {
-        if (runner != null) {
-          // cleans up the output directory of the task (where map outputs
-          // and reduce inputs get stored)
-          runner.close();
-        }
-
-        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          // No jvm reuse, remove everything
-          PathDeletionContext[] contexts =
-            buildTaskControllerTaskPathDeletionContexts(localFs,
-                getLocalFiles(fConf, ""), task, false/* not workDir */,
-                taskController);
-          directoryCleanupThread.addToQueue(contexts);
+    void removeTaskFiles(boolean removeOutputs) throws IOException {
+      if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+        String user = ugi.getShortUserName();
+        int userDirLen = TaskTracker.getUserDir(user).length();
+        String jobId = task.getJobID().toString();
+        String taskId = task.getTaskID().toString();
+        boolean cleanup = task.isTaskCleanupTask();
+        String taskDir;
+        if (!removeOutputs) {
+          taskDir = TaskTracker.getTaskWorkDir(user, jobId, taskId, cleanup);
         } else {
-          // Jvm reuse. We don't delete the workdir since some other task
-          // (running in the same JVM) might be using the dir. The JVM
-          // running the tasks would clean the workdir per a task in the
-          // task process itself.
-          String localTaskDir =
-            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
-                .toString(), task.isTaskCleanupTask());
-          PathDeletionContext[] contexts = buildPathDeletionContexts(
-              localFs, getLocalFiles(defaultJobConf, localTaskDir +
-                         Path.SEPARATOR + TaskTracker.JOBFILE));
-          directoryCleanupThread.addToQueue(contexts);
-        }
-      } else {
-        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-          PathDeletionContext[] contexts =
-            buildTaskControllerTaskPathDeletionContexts(localFs,
-              getLocalFiles(fConf, ""), task, true /* workDir */,
-              taskController);
-          directoryCleanupThread.addToQueue(contexts);
+          taskDir = TaskTracker.getLocalTaskDir(user, jobId, taskId, cleanup);
         }
+        PathDeletionContext item =
+          new TaskController.DeletionContext(taskController, false, user,
+                                             taskDir, localdirs);          
+        directoryCleanupThread.addToQueue(item);
       }
     }
         
@@ -3199,7 +3155,11 @@ public class TaskTracker 
     if (rjob == null) { //kill the JVM since the job is dead
       LOG.info("Killing JVM " + jvmId + " since job " + jvmId.getJobId() +
                " is dead");
-      jvmManager.killJvm(jvmId);
+      try {
+        jvmManager.killJvm(jvmId);
+      } catch (InterruptedException e) {
+        LOG.warn("Failed to kill " + jvmId, e);
+      }
       return new JvmTask(null, true);
     }
     TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);
@@ -3393,12 +3353,15 @@ public class TaskTracker 
   static class RunningJob{
     private JobID jobid; 
     private JobConf jobConf;
+    private Path localizedJobConf;
     // keep this for later use
     volatile Set<TaskInProgress> tasks;
-    boolean localized;
+    volatile boolean localized;
     boolean keepJobFiles;
     UserGroupInformation ugi;
     FetchStatus f;
+    TaskDistributedCacheManager distCacheMgr;
+    
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -4055,46 +4018,54 @@ public class TaskTracker 
     return distributedCacheManager;
   }
   
-    /**
-     * Download the job-token file from the FS and save on local fs.
-     * @param user
-     * @param jobId
-     * @param jobConf
-     * @return the local file system path of the downloaded file.
-     * @throws IOException
-     */
-    private String localizeJobTokenFile(String user, JobID jobId)
-        throws IOException {
-      // check if the tokenJob file is there..
-      Path skPath = new Path(systemDirectory, 
-          jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
-      
-      FileStatus status = null;
-      long jobTokenSize = -1;
-      status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
-      jobTokenSize = status.getLen();
-      
-      Path localJobTokenFile =
-          lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user, 
-              jobId.toString()), jobTokenSize, fConf);
-      String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
-      LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
-          " to " + localJobTokenFileStr);
-      
-      // Download job_token
-      systemFS.copyToLocalFile(skPath, localJobTokenFile);      
-      return localJobTokenFileStr;
-    }
-
-    JobACLsManager getJobACLsManager() {
-      return aclsManager.getJobACLsManager();
-    }
+  /**
+   * Download the job-token file from the FS and save on local fs.
+   * @param user
+   * @param jobId
+   * @param jobConf
+   * @return the local file system path of the downloaded file.
+   * @throws IOException
+   */
+  private String localizeJobTokenFile(String user, JobID jobId)
+      throws IOException {
+    // check if the tokenJob file is there..
+    Path skPath = new Path(systemDirectory, 
+        jobId.toString()+"/"+TokenCache.JOB_TOKEN_HDFS_FILE);
     
-    ACLsManager getACLsManager() {
-      return aclsManager;
-    }
+    FileStatus status = null;
+    long jobTokenSize = -1;
+    status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+    jobTokenSize = status.getLen();
+    
+    Path localJobTokenFile =
+        lDirAlloc.getLocalPathForWrite(getPrivateDirJobTokenFile(user, 
+            jobId.toString()), jobTokenSize, fConf);
+    String localJobTokenFileStr = localJobTokenFile.toUri().getPath();
+    LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() + 
+        " to " + localJobTokenFileStr);
+    
+    // Download job_token
+    systemFS.copyToLocalFile(skPath, localJobTokenFile);      
+    return localJobTokenFileStr;
+  }
+
+  JobACLsManager getJobACLsManager() {
+    return aclsManager.getJobACLsManager();
+  }
+  
+  ACLsManager getACLsManager() {
+    return aclsManager;
+  }
+
+  synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
+    return runningTasks.get(tid);
+  }
+
+  @Override
+  public void updatePrivateDistributedCacheSizes(
+      org.apache.hadoop.mapreduce.JobID jobId, long[] sizes)
+      throws IOException {
+    distributedCacheManager.setArchiveSizes(jobId, sizes);
+  }
 
-    synchronized TaskInProgress getRunningTask(TaskAttemptID tid) {
-      return runningTasks.get(tid);
-    }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Mar  8 05:56:27 2011
@@ -161,4 +161,13 @@ public interface TaskUmbilicalProtocol e
                                                        TaskAttemptID id) 
   throws IOException;
 
+  /**
+   * The job initializer needs to report the sizes of the archive
+   * objects in the private distributed cache.
+   * @param jobId the job to update
+   * @param sizes the array of sizes that were computed
+   * @throws IOException
+   */
+  void updatePrivateDistributedCacheSizes(org.apache.hadoop.mapreduce.JobID jobId,
+                                          long[] sizes) throws IOException;
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar  8 05:56:27 2011
@@ -80,9 +80,9 @@ class UberTask extends Task {
   }
 
   @Override
-  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip)
-      throws IOException {
-    return new UberTaskRunner(tip, tracker, conf);
+  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+      TaskTracker.RunningJob rjob) throws IOException {
+    return new UberTaskRunner(tip, tracker, conf, rjob);
   }
 
   /* perhaps someday we'll allow an UberTask to run as either a MapTask or a

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTaskRunner.java Tue Mar  8 05:56:27 2011
@@ -25,8 +25,9 @@ import org.apache.log4j.Level;
 
 public class UberTaskRunner extends TaskRunner {
 
-  public UberTaskRunner(TaskInProgress tip, TaskTracker tracker, JobConf conf) {
-    super(tip, tracker, conf);
+  public UberTaskRunner(TaskInProgress tip, TaskTracker tracker, JobConf conf,
+      TaskTracker.RunningJob rjob) throws IOException {
+    super(tip, tracker, conf, rjob);
   }
 
   @Override

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UserLogCleaner.java Tue Mar  8 05:56:27 2011
@@ -32,6 +32,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
@@ -53,15 +56,19 @@ class UserLogCleaner extends Thread {
   private Map<JobID, Long> completedJobs = Collections
       .synchronizedMap(new HashMap<JobID, Long>());
   private final long threadSleepTime;
-  private MRAsyncDiskService logAsyncDisk;
   private Clock clock;
+  private TaskController taskController;
+  private CleanupQueue cleanupQueue;
+  private FileSystem localFs;
 
-  UserLogCleaner(Configuration conf) throws IOException {
+  UserLogCleaner(Configuration conf, TaskController taskController) 
+  throws IOException {
     threadSleepTime = conf.getLong(TTConfig.TT_USERLOGCLEANUP_SLEEPTIME,
         DEFAULT_THREAD_SLEEP_TIME);
-    logAsyncDisk = new MRAsyncDiskService(FileSystem.getLocal(conf), TaskLog
-        .getUserLogDir().toString());
     setClock(new Clock());
+    localFs = FileSystem.getLocal(conf);
+    this.taskController = taskController;
+    cleanupQueue = CleanupQueue.getInstance();
   }
 
   void setClock(Clock clock) {
@@ -100,7 +107,7 @@ class UserLogCleaner extends Thread {
         // see if the job is old enough
         if (entry.getValue().longValue() <= now) {
           // add the job logs directory to for delete
-          deleteLogPath(TaskLog.getJobDir(entry.getKey()).getAbsolutePath());
+          deleteLogPath(entry.getKey().toString());
           completedJobIter.remove();
         }
       }
@@ -125,16 +132,12 @@ class UserLogCleaner extends Thread {
         // add all the log dirs to taskLogsMnonitor.
         long now = clock.getTime();
         for (String logDir : logDirs) {
-          if (logDir.equals(logAsyncDisk.TOBEDELETED)) {
-            // skip this
-            continue;
-          }
           JobID jobid = null;
           try {
             jobid = JobID.forName(logDir);
           } catch (IllegalArgumentException ie) {
             // if the directory is not a jobid, delete it immediately
-            deleteLogPath(new File(userLogDir, logDir).getAbsolutePath());
+            deleteLogPath(logDir);
             continue;
           }
           // add the job log directory with default retain hours, if it is not
@@ -197,6 +200,11 @@ class UserLogCleaner extends Thread {
    */
   private void deleteLogPath(String logPath) throws IOException {
     LOG.info("Deleting user log path " + logPath);
-    logAsyncDisk.moveAndDeleteAbsolutePath(logPath);
+    String logRoot = TaskLog.getUserLogDir().toString();
+    String user = localFs.getFileStatus(new Path(logRoot, logPath)).getOwner();
+    PathDeletionContext item = 
+      new TaskController.DeletionContext(taskController, true, user, logPath,
+                                         null);
+    cleanupQueue.addToQueue(item);
   }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobContext.java Tue Mar  8 05:56:27 2011
@@ -265,7 +265,7 @@ public interface JobContext extends MRJo
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getArchiveTimestamps();
+  public long[] getArchiveTimestamps();
 
   /**
    * Get the timestamps of the files.  Used by internal
@@ -273,7 +273,7 @@ public interface JobContext extends MRJo
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getFileTimestamps();
+  public long[] getFileTimestamps();
 
   /** 
    * Get the configured number of maximum attempts that will be made to run a

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java Tue Mar  8 05:56:27 2011
@@ -118,4 +118,4 @@ public class JobSubmissionFiles {
     return stagingArea;
   }
   
-}
\ No newline at end of file
+}