You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2011/10/06 00:37:53 UTC

svn commit: r1179465 [2/3] - in /hadoop/common/branches/branch-0.20-security-205/src: core/org/apache/hadoop/util/ mapred/ mapred/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapreduce/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/util/

Added: hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig?rev=1179465&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig (added)
+++ hadoop/common/branches/branch-0.20-security-205/src/mapred/org/apache/hadoop/mapred/TaskTracker.java.orig Wed Oct  5 22:37:52 2011
@@ -0,0 +1,4227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.Map.Entry;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Pattern;
+
+import javax.crypto.SecretKey;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.server.tasktracker.*;
+import org.apache.hadoop.mapreduce.server.tasktracker.userlogs.*;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskLog.LogFileDetail;
+import org.apache.hadoop.mapred.TaskLog.LogName;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.security.Credentials;
+
+/*******************************************************
+ * TaskTracker is a process that starts and tracks MR Tasks
+ * in a networked environment.  It contacts the JobTracker
+ * for Task assignments and reporting results.
+ *
+ *******************************************************/
+public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
+    Runnable, TaskTrackerMXBean {
+  
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  static final String MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY =
+    "mapred.tasktracker.vmem.reserved";
+  /**
+   * @deprecated
+   */
+  @Deprecated
+  static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY =
+     "mapred.tasktracker.pmem.reserved";
+
+  static final String CONF_VERSION_KEY = "mapreduce.tasktracker.conf.version";
+  static final String CONF_VERSION_DEFAULT = "default";
+
+  static final long WAIT_FOR_DONE = 3 * 1000;
+  private int httpPort;
+
+  static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
+
+  static{
+    Configuration.addDefaultResource("mapred-default.xml");
+    Configuration.addDefaultResource("mapred-site.xml");
+  }
+
+  public static final Log LOG =
+    LogFactory.getLog(TaskTracker.class);
+
+  public static final String MR_CLIENTTRACE_FORMAT =
+        "src: %s" +     // src IP
+        ", dest: %s" +  // dst IP
+        ", bytes: %s" + // byte count
+        ", op: %s" +    // operation
+        ", cliID: %s" +  // task id
+        ", duration: %s"; // duration
+  public static final Log ClientTraceLog =
+    LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
+
+  //Job ACLs file is created by TaskController under userlogs/$jobid directory
+  //for each job at job localization time. This will be used by TaskLogServlet 
+  //for authorizing viewing of task logs of that job
+  static String jobACLsFile = "job-acls.xml";
+
+  volatile boolean running = true;
+
+  /**
+   * Manages TT local storage directories.
+   */
+  static class LocalStorage {
+    private List<String> localDirs;
+    private int numFailures;
+
+    public LocalStorage(String[] dirs) {
+      localDirs = new ArrayList<String>();
+      localDirs.addAll(Arrays.asList(dirs));
+    }
+
+    /**
+     * @return the current valid directories 
+     */
+    synchronized String[] getDirs() {
+      return localDirs.toArray(new String[localDirs.size()]);
+    }
+
+    /**
+     * @return the current valid dirs as comma separated string
+     */
+    synchronized String getDirsString() {
+      return StringUtils.join(",", localDirs);
+    }
+
+    /**
+     * @return the number of valid local directories
+     */
+    synchronized int numDirs() {
+      return localDirs.size();
+    }
+
+    /**
+     * @return the number of directory failures
+     */
+    synchronized int numFailures() {
+      return numFailures;
+    }
+
+    /**
+     * Check the current set of local directories, updating the list
+     * of valid directories if necessary.
+     * @throws DiskErrorException if no directories are writable
+     */
+    synchronized void checkDirs() throws DiskErrorException {
+      for (String dir : localDirs) {
+        try {
+          DiskChecker.checkDir(new File(dir));
+        } catch (DiskErrorException de) {
+          LOG.warn("TaskTracker local dir " + dir + " error " + 
+              de.getMessage() + ", removing from local dirs");
+          localDirs.remove(dir);
+          numFailures++;
+        }
+      }
+      if (localDirs.isEmpty()) {
+        throw new DiskErrorException(
+            "No mapred local directories are writable");
+      }
+    }
+  }
+
+  private LocalStorage localStorage;
+  private long lastCheckDirsTime;
+  private int lastNumFailures;
+  private LocalDirAllocator localDirAllocator;
+  String taskTrackerName;
+  String localHostname;
+  InetSocketAddress jobTrackAddr;
+    
+  InetSocketAddress taskReportAddress;
+
+  Server taskReportServer = null;
+  InterTrackerProtocol jobClient;
+  
+  private TrackerDistributedCacheManager distributedCacheManager;
+  static int FILE_CACHE_SIZE = 2000;
+    
+  // last heartbeat response recieved
+  short heartbeatResponseId = -1;
+  
+  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
+
+  /*
+   * This is the last 'status' report sent by this tracker to the JobTracker.
+   * 
+   * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
+   * indicating that a 'fresh' status report be generated; in the event the
+   * rpc calls fails for whatever reason, the previous status report is sent
+   * again.
+   */
+  TaskTrackerStatus status = null;
+  
+  // The system-directory on HDFS where job files are stored 
+  Path systemDirectory = null;
+  
+  // The filesystem where job files are stored
+  FileSystem systemFS = null;
+  private FileSystem localFs = null;
+  private final HttpServer server;
+    
+  volatile boolean shuttingDown = false;
+    
+  Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
+  /**
+   * Map from taskId -> TaskInProgress.
+   */
+  Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+  Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+  private final JobTokenSecretManager jobTokenSecretManager
+    = new JobTokenSecretManager();
+
+  JobTokenSecretManager getJobTokenSecretManager() {
+    return jobTokenSecretManager;
+  }
+
+  RunningJob getRunningJob(JobID jobId) {
+    return runningJobs.get(jobId);
+  }
+
+  volatile int mapTotal = 0;
+  volatile int reduceTotal = 0;
+  boolean justStarted = true;
+  boolean justInited = true;
+  // Mark reduce tasks that are shuffling to rollback their events index
+  Set<TaskAttemptID> shouldReset = new HashSet<TaskAttemptID>();
+    
+  //dir -> DF
+  Map<String, DF> localDirsDf = new HashMap<String, DF>();
+  long minSpaceStart = 0;
+  //must have this much space free to start new tasks
+  boolean acceptNewTasks = true;
+  long minSpaceKill = 0;
+  //if we run under this limit, kill one task
+  //and make sure we never receive any new jobs
+  //until all the old tasks have been cleaned up.
+  //this is if a machine is so full it's only good
+  //for serving map output to the other nodes
+
+  static Random r = new Random();
+  public static final String SUBDIR = "taskTracker";
+  static final String DISTCACHEDIR = "distcache";
+  static final String JOBCACHE = "jobcache";
+  static final String OUTPUT = "output";
+  static final String JARSDIR = "jars";
+  static final String LOCAL_SPLIT_FILE = "split.info";
+  static final String JOBFILE = "job.xml";
+  static final String TT_PRIVATE_DIR = "ttprivate";
+  public static final String TT_LOG_TMP_DIR = "tt_log_tmp";
+  static final String JVM_EXTRA_ENV_FILE = "jvm.extra.env";
+
+  static final String JOB_LOCAL_DIR = "job.local.dir";
+  static final String JOB_TOKEN_FILE="jobToken"; //localized file
+
+  private JobConf fConf;
+  private JobConf originalConf;
+  private Localizer localizer;
+  private int maxMapSlots;
+  private int maxReduceSlots;
+  private int failures;
+  final long mapRetainSize;
+  final long reduceRetainSize;
+
+  private ACLsManager aclsManager;
+  
+  // Performance-related config knob to send an out-of-band heartbeat
+  // on task completion
+  static final String TT_OUTOFBAND_HEARBEAT =
+    "mapreduce.tasktracker.outofband.heartbeat";
+  private volatile boolean oobHeartbeatOnTaskCompletion;
+  
+  // Track number of completed tasks to send an out-of-band heartbeat
+  private IntWritable finishedCount = new IntWritable(0);
+  
+  private MapEventsFetcherThread mapEventsFetcher;
+  final int workerThreads;
+  CleanupQueue directoryCleanupThread;
+  private volatile JvmManager jvmManager;
+  
+  private TaskMemoryManagerThread taskMemoryManager;
+  private boolean taskMemoryManagerEnabled = true;
+  private long totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
+  private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+
+  private UserLogManager userLogManager;
+
+  static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
+      "mapred.tasktracker.memory_calculator_plugin";
+
+  /**
+   * the minimum interval between jobtracker polls
+   */
+  private volatile int heartbeatInterval = HEARTBEAT_INTERVAL_MIN;
+  /**
+   * Number of maptask completion events locations to poll for at one time
+   */  
+  private int probe_sample_size = 500;
+
+  private IndexCache indexCache;
+
+  /**
+  * Handle to the specific instance of the {@link TaskController} class
+  */
+  private TaskController taskController;
+  
+  /**
+   * Handle to the specific instance of the {@link NodeHealthCheckerService}
+   */
+  private NodeHealthCheckerService healthChecker;
+  
+  /**
+   * Configuration property for disk health check interval in milli seconds.
+   * Currently, configuring this to a value smaller than the heartbeat interval
+   * is equivalent to setting this to heartbeat interval value.
+   */
+  static final String DISK_HEALTH_CHECK_INTERVAL_PROPERTY =
+      "mapred.disk.healthChecker.interval";
+  /**
+   * How often TaskTracker needs to check the health of its disks.
+   * Default value is {@link MRConstants#DEFAULT_DISK_HEALTH_CHECK_INTERVAL}
+   */
+  private long diskHealthCheckInterval;
+
+  /*
+   * A list of commitTaskActions for whom commit response has been received 
+   */
+  private List<TaskAttemptID> commitResponses = 
+            Collections.synchronizedList(new ArrayList<TaskAttemptID>());
+
+  private ShuffleServerInstrumentation shuffleServerMetrics;
+
+  private TaskTrackerInstrumentation myInstrumentation = null;
+
+  public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
+    return myInstrumentation;
+  }
+  
+  /**
+   * A list of tips that should be cleaned up.
+   */
+  private BlockingQueue<TaskTrackerAction> tasksToCleanup = 
+    new LinkedBlockingQueue<TaskTrackerAction>();
+    
+  /**
+   * A daemon-thread that pulls tips off the list of things to cleanup.
+   */
+  private Thread taskCleanupThread = 
+    new Thread(new Runnable() {
+        public void run() {
+          while (true) {
+            try {
+              TaskTrackerAction action = tasksToCleanup.take();
+              checkJobStatusAndWait(action);
+              if (action instanceof KillJobAction) {
+                purgeJob((KillJobAction) action);
+              } else if (action instanceof KillTaskAction) {
+                processKillTaskAction((KillTaskAction) action);
+              } else {
+                LOG.error("Non-delete action given to cleanup thread: "
+                          + action);
+              }
+            } catch (Throwable except) {
+              LOG.warn(StringUtils.stringifyException(except));
+            }
+          }
+        }
+      }, "taskCleanup");
+
+  void processKillTaskAction(KillTaskAction killAction) throws IOException {
+    TaskInProgress tip;
+    synchronized (TaskTracker.this) {
+      tip = tasks.get(killAction.getTaskID());
+    }
+    LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
+    purgeTask(tip, false);
+  }
+  
+  private void checkJobStatusAndWait(TaskTrackerAction action) 
+  throws InterruptedException {
+    JobID jobId = null;
+    if (action instanceof KillJobAction) {
+      jobId = ((KillJobAction)action).getJobID();
+    } else if (action instanceof KillTaskAction) {
+      jobId = ((KillTaskAction)action).getTaskID().getJobID();
+    } else {
+      return;
+    }
+    RunningJob rjob = null;
+    synchronized (runningJobs) {
+      rjob = runningJobs.get(jobId);
+    }
+    if (rjob != null) {
+      synchronized (rjob) {
+        while (rjob.localizing) {
+          rjob.wait();
+        }
+      }
+    }
+  }
+
+  public TaskController getTaskController() {
+    return taskController;
+  }
+  
+  // Currently this is used only by tests
+  void setTaskController(TaskController t) {
+    taskController = t;
+  }
+  
+  private RunningJob addTaskToJob(JobID jobId, 
+                                  TaskInProgress tip) {
+    synchronized (runningJobs) {
+      RunningJob rJob = null;
+      if (!runningJobs.containsKey(jobId)) {
+        rJob = new RunningJob(jobId);
+        rJob.tasks = new HashSet<TaskInProgress>();
+        runningJobs.put(jobId, rJob);
+      } else {
+        rJob = runningJobs.get(jobId);
+      }
+      synchronized (rJob) {
+        rJob.tasks.add(tip);
+      }
+      return rJob;
+    }
+  }
+
+  private void removeTaskFromJob(JobID jobId, TaskInProgress tip) {
+    synchronized (runningJobs) {
+      RunningJob rjob = runningJobs.get(jobId);
+      if (rjob == null) {
+        LOG.warn("Unknown job " + jobId + " being deleted.");
+      } else {
+        synchronized (rjob) {
+          rjob.tasks.remove(tip);
+        }
+      }
+    }
+  }
+
+  UserLogManager getUserLogManager() {
+    return this.userLogManager;
+  }
+
+  void setUserLogManager(UserLogManager u) {
+    this.userLogManager = u;
+  }
+
+  public static String getUserDir(String user) {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + user;
+  } 
+
+  Localizer getLocalizer() {
+    return localizer;
+  }
+
+  void setLocalizer(Localizer l) {
+    localizer = l;
+  }
+
+  public static String getPrivateDistributedCacheDir(String user) {
+    return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  }
+  
+  public static String getPublicDistributedCacheDir() {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  }
+
+  public static String getJobCacheSubdir(String user) {
+    return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
+  }
+
+  public static String getLocalJobDir(String user, String jobid) {
+    return getJobCacheSubdir(user) + Path.SEPARATOR + jobid;
+  }
+
+  static String getLocalJobConfFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+  }
+  
+  static String getPrivateDirJobConfFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobConfFile(user, jobid);
+  }
+
+  static String getTaskConfFile(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+    + Path.SEPARATOR + TaskTracker.JOBFILE;
+  }
+  
+  static String getPrivateDirTaskScriptLocation(String user, String jobid, 
+     String taskid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + 
+           getLocalTaskDir(user, jobid, taskid);
+  }
+
+  static String getJobJarsDir(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+  }
+
+  public static String getJobJarFile(String user, String jobid) {
+    return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
+  }
+  
+  static String getJobWorkDir(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+  }
+
+  static String getLocalSplitFile(String user, String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+    + TaskTracker.LOCAL_SPLIT_FILE;
+  }
+
+  static String getIntermediateOutputDir(String user, String jobid,
+      String taskid) {
+    return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
+    + TaskTracker.OUTPUT;
+  }
+
+  public static String getLocalTaskDir(String user, String jobid, 
+      String taskid) {
+    return getLocalTaskDir(user, jobid, taskid, false);
+  }
+  
+  public static String getLocalTaskDir(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String taskDir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
+      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
+    }
+    return taskDir;
+  }
+  
+  static String getTaskWorkDir(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt);
+    return dir + Path.SEPARATOR + MRConstants.WORKDIR;
+  }
+
+  static String getLocalJobTokenFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+  }
+  
+  static String getPrivateDirJobTokenFile(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + 
+           getLocalJobTokenFile(user, jobid); 
+  }
+  
+  static String getPrivateDirForJob(String user, String jobid) {
+    return TT_PRIVATE_DIR + Path.SEPARATOR + getLocalJobDir(user, jobid) ;
+  }
+
+  private FileSystem getFS(final Path filePath, JobID jobId,
+      final Configuration conf) throws IOException, InterruptedException {
+    RunningJob rJob = runningJobs.get(jobId);
+    FileSystem userFs = 
+      rJob.ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+        public FileSystem run() throws IOException {
+          return filePath.getFileSystem(conf);
+      }});
+    return userFs;
+  }
+  
+  String getPid(TaskAttemptID tid) {
+    TaskInProgress tip = tasks.get(tid);
+    if (tip != null) {
+      return jvmManager.getPid(tip.getTaskRunner());
+    }
+    return null;
+  }
+  
+  public long getProtocolVersion(String protocol, 
+                                 long clientVersion) throws IOException {
+    if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
+      return TaskUmbilicalProtocol.versionID;
+    } else {
+      throw new IOException("Unknown protocol for task tracker: " +
+                            protocol);
+    }
+  }
+
+  /**
+   * Delete all of the user directories.
+   * @param conf the TT configuration
+   * @throws IOException
+   */
+  private void deleteUserDirectories(Configuration conf) throws IOException {
+    for(String root: localStorage.getDirs()) {
+      for(FileStatus status: localFs.listStatus(new Path(root, SUBDIR))) {
+        String owner = status.getOwner();
+        String path = status.getPath().getName();
+        if (path.equals(owner)) {
+          taskController.deleteAsUser(owner, "");
+        }
+      }
+    }
+  }
+
+  public static final String TT_USER_NAME = "mapreduce.tasktracker.kerberos.principal";
+  public static final String TT_KEYTAB_FILE =
+    "mapreduce.tasktracker.keytab.file";  
+  /**
+   * Do the real constructor work here.  It's in a separate method
+   * so we can call it again and "recycle" the object after calling
+   * close().
+   */
+  synchronized void initialize() throws IOException, InterruptedException {
+    this.fConf = new JobConf(originalConf);
+    
+    LOG.info("Starting tasktracker with owner as "
+        + getMROwner().getShortUserName());
+
+    localFs = FileSystem.getLocal(fConf);
+    if (fConf.get("slave.host.name") != null) {
+      this.localHostname = fConf.get("slave.host.name");
+    }
+    if (localHostname == null) {
+      this.localHostname =
+      DNS.getDefaultHost
+      (fConf.get("mapred.tasktracker.dns.interface","default"),
+       fConf.get("mapred.tasktracker.dns.nameserver","default"));
+    }
+
+    final String dirs = localStorage.getDirsString();
+    fConf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, dirs);
+    LOG.info("Good mapred local directories are: " + dirs);
+    taskController.setConf(fConf);
+    // Setup task controller so that deletion of user dirs happens properly
+    taskController.setup(localDirAllocator, localStorage);
+    server.setAttribute("conf", fConf);
+
+    deleteUserDirectories(fConf);
+
+    // NB: deleteLocalFiles uses the configured local dirs, but does not 
+    // fail if a local directory has failed. 
+    fConf.deleteLocalFiles(SUBDIR);
+    final FsPermission ttdir = FsPermission.createImmutable((short) 0755);
+    for (String s : localStorage.getDirs()) {
+      localFs.mkdirs(new Path(s, SUBDIR), ttdir);
+    }
+    fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+    final FsPermission priv = FsPermission.createImmutable((short) 0700);
+    for (String s : localStorage.getDirs()) {
+      localFs.mkdirs(new Path(s, TT_PRIVATE_DIR), priv);
+    }
+    fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
+    final FsPermission pub = FsPermission.createImmutable((short) 0755);
+    for (String s : localStorage.getDirs()) {
+      localFs.mkdirs(new Path(s, TT_LOG_TMP_DIR), pub);
+    }
+    // Create userlogs directory under all good mapred-local-dirs
+    for (String s : localStorage.getDirs()) {
+      Path userLogsDir = new Path(s, TaskLog.USERLOGS_DIR_NAME);
+      if (!localFs.exists(userLogsDir)) {
+        localFs.mkdirs(userLogsDir, pub);
+      }
+    }
+    // Clear out state tables
+    this.tasks.clear();
+    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
+    this.runningJobs = new TreeMap<JobID, RunningJob>();
+    this.mapTotal = 0;
+    this.reduceTotal = 0;
+    this.acceptNewTasks = true;
+    this.status = null;
+
+    this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
+    this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+    //tweak the probe sample size (make it a function of numCopiers)
+    probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
+
+    createInstrumentation();
+
+    // bind address
+    String address = 
+      NetUtils.getServerAddress(fConf,
+                                "mapred.task.tracker.report.bindAddress", 
+                                "mapred.task.tracker.report.port", 
+                                "mapred.task.tracker.report.address");
+    InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
+    String bindAddress = socAddr.getHostName();
+    int tmpPort = socAddr.getPort();
+    
+    this.jvmManager = new JvmManager(this);
+
+    // Set service-level authorization security policy
+    if (this.fConf.getBoolean(
+          ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
+      PolicyProvider policyProvider = 
+        (PolicyProvider)(ReflectionUtils.newInstance(
+            this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                MapReducePolicyProvider.class, PolicyProvider.class), 
+            this.fConf));
+      ServiceAuthorizationManager.refresh(fConf, policyProvider);
+    }
+    
+    // RPC initialization
+    int max = maxMapSlots > maxReduceSlots ? 
+                       maxMapSlots : maxReduceSlots;
+    //set the num handlers to max*2 since canCommit may wait for the duration
+    //of a heartbeat RPC
+    this.taskReportServer = RPC.getServer(this, bindAddress,
+        tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
+    this.taskReportServer.start();
+
+    // get the assigned address
+    this.taskReportAddress = taskReportServer.getListenerAddress();
+    this.fConf.set("mapred.task.tracker.report.address",
+        taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
+    LOG.info("TaskTracker up at: " + this.taskReportAddress);
+
+    this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
+    LOG.info("Starting tracker " + taskTrackerName);
+
+    // Initialize DistributedCache
+    this.distributedCacheManager = new TrackerDistributedCacheManager(
+        this.fConf, taskController);
+    this.distributedCacheManager.startCleanupThread();
+    
+    this.jobClient = (InterTrackerProtocol) 
+    UserGroupInformation.getLoginUser().doAs(
+        new PrivilegedExceptionAction<Object>() {
+      public Object run() throws IOException {
+        return RPC.waitForProxy(InterTrackerProtocol.class,
+            InterTrackerProtocol.versionID,
+            jobTrackAddr, fConf);
+      }
+    });
+    this.justInited = true;
+    this.running = true;    
+    // start the thread that will fetch map task completion events
+    this.mapEventsFetcher = new MapEventsFetcherThread();
+    mapEventsFetcher.setDaemon(true);
+    mapEventsFetcher.setName(
+                             "Map-events fetcher for all reduce tasks " + "on " + 
+                             taskTrackerName);
+    mapEventsFetcher.start();
+
+    initializeMemoryManagement();
+
+    getUserLogManager().clearOldUserLogs(fConf);
+
+    setIndexCache(new IndexCache(this.fConf));
+
+    mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
+    reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
+    mapLauncher.start();
+    reduceLauncher.start();
+
+    // create a localizer instance
+    setLocalizer(new Localizer(localFs, localStorage.getDirs()));
+
+    //Start up node health checker service.
+    if (shouldStartHealthMonitor(this.fConf)) {
+      startHealthMonitor(this.fConf);
+    }
+    
+    oobHeartbeatOnTaskCompletion = 
+      fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
+  }
+
+  private void createInstrumentation() {
+    Class<? extends TaskTrackerInstrumentation> metricsInst =
+        getInstrumentationClass(fConf);
+    LOG.debug("instrumentation class="+ metricsInst);
+    if (metricsInst == null) {
+      myInstrumentation = TaskTrackerInstrumentation.create(this);
+      return;
+    }
+    try {
+      java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
+        metricsInst.getConstructor(new Class<?>[] {TaskTracker.class} );
+      this.myInstrumentation = c.newInstance(this);
+    } catch(Exception e) {
+      //Reflection can throw lots of exceptions -- handle them all by
+      //falling back on the default.
+      LOG.error("failed to initialize taskTracker metrics", e);
+      this.myInstrumentation = TaskTrackerInstrumentation.create(this);
+    }
+
+  }
+
+  UserGroupInformation getMROwner() {
+    return aclsManager.getMROwner();
+  }
+
+  /**
+   * Are ACLs for authorization checks enabled on the TT ?
+   */
+  boolean areACLsEnabled() {
+    return fConf.getBoolean(JobConf.MR_ACLS_ENABLED, false);
+  }
+
+  static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
+    Configuration conf) {
+    return conf.getClass("mapred.tasktracker.instrumentation", null,
+                         TaskTrackerInstrumentation.class);
+  }
+
+  static void setInstrumentationClass(
+    Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
+    conf.setClass("mapred.tasktracker.instrumentation",
+        t, TaskTrackerInstrumentation.class);
+  }
+  
+  /** 
+   * Removes all contents of temporary storage.  Called upon 
+   * startup, to remove any leftovers from previous run.
+   */
+  public void cleanupStorage() throws IOException {
+    this.fConf.deleteLocalFiles(SUBDIR);
+    this.fConf.deleteLocalFiles(TT_PRIVATE_DIR);
+    this.fConf.deleteLocalFiles(TT_LOG_TMP_DIR);
+  }
+
+  // Object on wait which MapEventsFetcherThread is going to wait.
+  private Object waitingOn = new Object();
+
+  private class MapEventsFetcherThread extends Thread {
+
+    private List <FetchStatus> reducesInShuffle() {
+      List <FetchStatus> fList = new ArrayList<FetchStatus>();
+      for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
+        RunningJob rjob = item.getValue();
+        if (!rjob.localized) {
+          continue;
+        }
+        JobID jobId = item.getKey();
+        FetchStatus f;
+        synchronized (rjob) {
+          f = rjob.getFetchStatus();
+          for (TaskInProgress tip : rjob.tasks) {
+            Task task = tip.getTask();
+            if (!task.isMapTask()) {
+              if (((ReduceTask)task).getPhase() == 
+                  TaskStatus.Phase.SHUFFLE) {
+                if (rjob.getFetchStatus() == null) {
+                  //this is a new job; we start fetching its map events
+                  f = new FetchStatus(jobId, 
+                                      ((ReduceTask)task).getNumMaps());
+                  rjob.setFetchStatus(f);
+                }
+                f = rjob.getFetchStatus();
+                fList.add(f);
+                break; //no need to check any more tasks belonging to this
+              }
+            }
+          }
+        }
+      }
+      //at this point, we have information about for which of
+      //the running jobs do we need to query the jobtracker for map 
+      //outputs (actually map events).
+      return fList;
+    }
+      
+    @Override
+    public void run() {
+      LOG.info("Starting thread: " + this.getName());
+        
+      while (running) {
+        try {
+          List <FetchStatus> fList = null;
+          synchronized (runningJobs) {
+            while (((fList = reducesInShuffle()).size()) == 0) {
+              try {
+                runningJobs.wait();
+              } catch (InterruptedException e) {
+                LOG.info("Shutting down: " + this.getName());
+                return;
+              }
+            }
+          }
+          // now fetch all the map task events for all the reduce tasks
+          // possibly belonging to different jobs
+          boolean fetchAgain = false; //flag signifying whether we want to fetch
+                                      //immediately again.
+          for (FetchStatus f : fList) {
+            long currentTime = System.currentTimeMillis();
+            try {
+              //the method below will return true when we have not 
+              //fetched all available events yet
+              if (f.fetchMapCompletionEvents(currentTime)) {
+                fetchAgain = true;
+              }
+            } catch (Exception e) {
+              LOG.warn(
+                       "Ignoring exception that fetch for map completion" +
+                       " events threw for " + f.jobId + " threw: " +
+                       StringUtils.stringifyException(e)); 
+            }
+            if (!running) {
+              break;
+            }
+          }
+          synchronized (waitingOn) {
+            try {
+              if (!fetchAgain) {
+                waitingOn.wait(heartbeatInterval);
+              }
+            } catch (InterruptedException ie) {
+              LOG.info("Shutting down: " + this.getName());
+              return;
+            }
+          }
+        } catch (Exception e) {
+          LOG.info("Ignoring exception "  + e.getMessage());
+        }
+      }
+    } 
+  }
+
+  private class FetchStatus {
+    /** The next event ID that we will start querying the JobTracker from*/
+    private IntWritable fromEventId;
+    /** This is the cache of map events for a given job */ 
+    private List<TaskCompletionEvent> allMapEvents;
+    /** What jobid this fetchstatus object is for*/
+    private JobID jobId;
+    private long lastFetchTime;
+    private boolean fetchAgain;
+     
+    public FetchStatus(JobID jobId, int numMaps) {
+      this.fromEventId = new IntWritable(0);
+      this.jobId = jobId;
+      this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
+    }
+      
+    /**
+     * Reset the events obtained so far.
+     */
+    public void reset() {
+      // Note that the sync is first on fromEventId and then on allMapEvents
+      synchronized (fromEventId) {
+        synchronized (allMapEvents) {
+          fromEventId.set(0); // set the new index for TCE
+          allMapEvents.clear();
+        }
+      }
+    }
+    
+    public TaskCompletionEvent[] getMapEvents(int fromId, int max) {
+        
+      TaskCompletionEvent[] mapEvents = 
+        TaskCompletionEvent.EMPTY_ARRAY;
+      boolean notifyFetcher = false; 
+      synchronized (allMapEvents) {
+        if (allMapEvents.size() > fromId) {
+          int actualMax = Math.min(max, (allMapEvents.size() - fromId));
+          List <TaskCompletionEvent> eventSublist = 
+            allMapEvents.subList(fromId, actualMax + fromId);
+          mapEvents = eventSublist.toArray(mapEvents);
+        } else {
+          // Notify Fetcher thread. 
+          notifyFetcher = true;
+        }
+      }
+      if (notifyFetcher) {
+        synchronized (waitingOn) {
+          waitingOn.notify();
+        }
+      }
+      return mapEvents;
+    }
+      
+    public boolean fetchMapCompletionEvents(long currTime) throws IOException {
+      if (!fetchAgain && (currTime - lastFetchTime) < heartbeatInterval) {
+        return false;
+      }
+      int currFromEventId = 0;
+      synchronized (fromEventId) {
+        currFromEventId = fromEventId.get();
+        List <TaskCompletionEvent> recentMapEvents = 
+          queryJobTracker(fromEventId, jobId, jobClient);
+        synchronized (allMapEvents) {
+          allMapEvents.addAll(recentMapEvents);
+        }
+        lastFetchTime = currTime;
+        if (fromEventId.get() - currFromEventId >= probe_sample_size) {
+          //return true when we have fetched the full payload, indicating
+          //that we should fetch again immediately (there might be more to
+          //fetch
+          fetchAgain = true;
+          return true;
+        }
+      }
+      fetchAgain = false;
+      return false;
+    }
+  }
+
+  private static LocalDirAllocator lDirAlloc = 
+                              new LocalDirAllocator("mapred.local.dir");
+
+  // intialize the job directory
+  RunningJob localizeJob(TaskInProgress tip) 
+  throws IOException, InterruptedException {
+    Task t = tip.getTask();
+    JobID jobId = t.getJobID();
+    RunningJob rjob = addTaskToJob(jobId, tip);
+    InetSocketAddress ttAddr = getTaskTrackerReportAddress();
+    try {
+      synchronized (rjob) {
+        if (!rjob.localized) {
+          while (rjob.localizing) {
+            rjob.wait();
+          }
+          if (!rjob.localized) {
+            //this thread is localizing the job
+            rjob.localizing = true;
+          }
+        }
+      }
+      if (!rjob.localized) {
+        Path localJobConfPath = initializeJob(t, rjob, ttAddr);
+        JobConf localJobConf = new JobConf(localJobConfPath);
+        //to be doubly sure, overwrite the user in the config with the one the TT 
+        //thinks it is
+        localJobConf.setUser(t.getUser());
+        //also reset the #tasks per jvm
+        resetNumTasksPerJvm(localJobConf);
+        //set the base jobconf path in rjob; all tasks will use
+        //this as the base path when they run
+        synchronized (rjob) {
+          rjob.localizedJobConf = localJobConfPath;
+          rjob.jobConf = localJobConf;  
+          rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
+              localJobConf.getKeepFailedTaskFiles());
+
+          rjob.localized = true;
+        }
+      } 
+    } finally {
+      synchronized (rjob) {
+        if (rjob.localizing) {
+          rjob.localizing = false;
+          rjob.notifyAll();
+        }
+      }
+    }
+    synchronized (runningJobs) {
+      runningJobs.notify(); //notify the fetcher thread
+    }
+    return rjob;
+  }
+
+  /**
+   * Localize the job on this tasktracker. Specifically
+   * <ul>
+   * <li>Cleanup and create job directories on all disks</li>
+   * <li>Download the credentials file</li>
+   * <li>Download the job config file job.xml from the FS</li>
+   * <li>Invokes the {@link TaskController} to do the rest of the job 
+   * initialization</li>
+   * </ul>
+   *
+   * @param t task whose job has to be localized on this TT
+   * @param rjob the {@link RunningJob}
+   * @param ttAddr the tasktracker's RPC address
+   * @return the path to the job configuration to be used for all the tasks
+   *         of this job as a starting point.
+   * @throws IOException
+   */
+  Path initializeJob(final Task t, final RunningJob rjob, 
+      final InetSocketAddress ttAddr)
+  throws IOException, InterruptedException {
+    final JobID jobId = t.getJobID();
+
+    final Path jobFile = new Path(t.getJobFile());
+    final String userName = t.getUser();
+    final Configuration conf = getJobConf();
+
+    // save local copy of JobToken file
+    final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
+    synchronized (rjob) {
+      rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
+
+      Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
+      Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+      if (jt != null) { //could be null in the case of some unit tests
+        getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
+      }
+      for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+        rjob.ugi.addToken(token);
+      }
+    }
+
+    FileSystem userFs = getFS(jobFile, jobId, conf);
+
+    // Download the job.xml for this job from the system FS
+    final Path localJobFile =
+      localizeJobConfFile(new Path(t.getJobFile()), userName, userFs, jobId);
+
+    /**
+      * Now initialize the job via task-controller to do the rest of the
+      * job-init. Do this within a doAs since the public distributed cache 
+      * is also set up here.
+      * To support potential authenticated HDFS accesses, we need the tokens
+      */
+    rjob.ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      public Object run() throws IOException, InterruptedException {
+        try {
+          final JobConf localJobConf = new JobConf(localJobFile);
+          // Setup the public distributed cache
+          TaskDistributedCacheManager taskDistributedCacheManager =
+            getTrackerDistributedCacheManager()
+           .newTaskDistributedCacheManager(jobId, localJobConf);
+          rjob.distCacheMgr = taskDistributedCacheManager;
+          taskDistributedCacheManager.setupCache(localJobConf,
+            TaskTracker.getPublicDistributedCacheDir(),
+            TaskTracker.getPrivateDistributedCacheDir(userName));
+
+          // Set some config values
+          localJobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+              getJobConf().get(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+          if (conf.get("slave.host.name") != null) {
+            localJobConf.set("slave.host.name", conf.get("slave.host.name"));
+          }
+          resetNumTasksPerJvm(localJobConf);
+          localJobConf.setUser(t.getUser());
+
+          // write back the config (this config will have the updates that the
+          // distributed cache manager makes as well)
+          JobLocalizer.writeLocalJobFile(localJobFile, localJobConf);
+          taskController.initializeJob(t.getUser(), jobId.toString(), 
+              new Path(localJobTokenFile), localJobFile, TaskTracker.this,
+              ttAddr);
+        } catch (IOException e) {
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(e));
+          throw e;
+        } catch (InterruptedException ie) {
+          LOG.warn("Exception while localization " + 
+              StringUtils.stringifyException(ie));
+          throw ie;
+        }
+        return null;
+      }
+    });
+    //search for the conf that the initializeJob created
+    //need to look up certain configs from this conf, like
+    //the distributed cache, profiling, etc. ones
+    Path initializedConf = lDirAlloc.getLocalPathToRead(getLocalJobConfFile(
+           userName, jobId.toString()), getJobConf());
+    return initializedConf;
+  }
+  
+  /** If certain configs are enabled, the jvm-reuse should be disabled
+   * @param localJobConf
+   */
+  static void resetNumTasksPerJvm(JobConf localJobConf) {
+    boolean debugEnabled = false;
+    if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+      return;
+    }
+    if (localJobConf.getMapDebugScript() != null || 
+        localJobConf.getReduceDebugScript() != null) {
+      debugEnabled = true;
+    }
+    String keepPattern = localJobConf.getKeepTaskFilesPattern();
+    
+    if (debugEnabled || localJobConf.getProfileEnabled() ||
+        keepPattern != null || localJobConf.getKeepFailedTaskFiles()) {
+      //disable jvm reuse
+      localJobConf.setNumTasksToExecutePerJvm(1);
+    }
+  }
+
+  // Remove the log dir from the tasklog cleanup thread
+  void saveLogDir(JobID jobId, JobConf localJobConf)
+      throws IOException {
+    // remove it from tasklog cleanup thread first,
+    // it might be added there because of tasktracker reinit or restart
+    JobStartedEvent jse = new JobStartedEvent(jobId);
+    getUserLogManager().addLogEvent(jse);
+  }
+
+  
+  /**
+   * Download the job configuration file from the FS.
+   *
+   * @param jobFile the original location of the configuration file
+   * @param user the user in question
+   * @param userFs the FileSystem created on behalf of the user
+   * @param jobId jobid in question
+   * @return the local file system path of the downloaded file.
+   * @throws IOException
+   */
+  private Path localizeJobConfFile(Path jobFile, String user, 
+      FileSystem userFs, JobID jobId)
+  throws IOException {
+    // Get sizes of JobFile and JarFile
+    // sizes are -1 if they are not present.
+    FileStatus status = null;
+    long jobFileSize = -1;
+    try {
+      status = userFs.getFileStatus(jobFile);
+      jobFileSize = status.getLen();
+    } catch(FileNotFoundException fe) {
+      jobFileSize = -1;
+    }
+    Path localJobFile =
+      lDirAlloc.getLocalPathForWrite(getPrivateDirJobConfFile(user,
+          jobId.toString()), jobFileSize, fConf);
+
+    // Download job.xml
+    userFs.copyToLocalFile(jobFile, localJobFile);
+    return localJobFile;
+  }
+
+  protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,
+                                RunningJob rjob) throws IOException {
+    synchronized (tip) {
+      jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,
+                  localStorage.getDirsString());
+      tip.setJobConf(jobConf);
+      tip.setUGI(rjob.ugi);
+      tip.launchTask(rjob);
+    }
+  }
+    
+  public synchronized void shutdown() throws IOException, InterruptedException {
+    shuttingDown = true;
+    close();
+    if (this.server != null) {
+      try {
+        LOG.info("Shutting down StatusHttpServer");
+        this.server.stop();
+      } catch (Exception e) {
+        LOG.warn("Exception shutting down TaskTracker", e);
+      }
+    }
+  }
+  /**
+   * Close down the TaskTracker and all its components.  We must also shutdown
+   * any running tasks or threads, and cleanup disk space.  A new TaskTracker
+   * within the same process space might be restarted, so everything must be
+   * clean.
+   * @throws InterruptedException 
+   */
+  public synchronized void close() throws IOException, InterruptedException {
+    //
+    // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
+    // because calling jobHasFinished() may result in an edit to 'tasks'.
+    //
+    TreeMap<TaskAttemptID, TaskInProgress> tasksToClose =
+      new TreeMap<TaskAttemptID, TaskInProgress>();
+    tasksToClose.putAll(tasks);
+    for (TaskInProgress tip : tasksToClose.values()) {
+      tip.jobHasFinished(false);
+    }
+    
+    this.running = false;
+
+    // Clear local storage
+    cleanupStorage();
+        
+    // Shutdown the fetcher thread
+    this.mapEventsFetcher.interrupt();
+    
+    //stop the launchers
+    this.mapLauncher.interrupt();
+    this.reduceLauncher.interrupt();
+
+    this.distributedCacheManager.stopCleanupThread();
+    jvmManager.stop();
+    
+    // shutdown RPC connections
+    RPC.stopProxy(jobClient);
+
+    // wait for the fetcher thread to exit
+    for (boolean done = false; !done; ) {
+      try {
+        this.mapEventsFetcher.join();
+        done = true;
+      } catch (InterruptedException e) {
+      }
+    }
+    
+    if (taskReportServer != null) {
+      taskReportServer.stop();
+      taskReportServer = null;
+    }
+    if (healthChecker != null) {
+      //stop node health checker service
+      healthChecker.stop();
+      healthChecker = null;
+    }
+  }
+
+  /**
+   * For testing
+   */
+  TaskTracker() {
+    server = null;
+    workerThreads = 0;
+    mapRetainSize = TaskLogsTruncater.DEFAULT_RETAIN_SIZE;
+    reduceRetainSize = TaskLogsTruncater.DEFAULT_RETAIN_SIZE;
+  }
+
+  void setConf(JobConf conf) {
+    fConf = conf;
+  }
+
+  /**
+   * Start with the local machine name, and the default JobTracker
+   */
+  public TaskTracker(JobConf conf) throws IOException, InterruptedException {
+    originalConf = conf;
+    FILE_CACHE_SIZE = conf.getInt("mapred.tasktracker.file.cache.size", 2000);
+    maxMapSlots = conf.getInt(
+                  "mapred.tasktracker.map.tasks.maximum", 2);
+    maxReduceSlots = conf.getInt(
+                  "mapred.tasktracker.reduce.tasks.maximum", 2);
+    diskHealthCheckInterval = conf.getLong(DISK_HEALTH_CHECK_INTERVAL_PROPERTY,
+                                           DEFAULT_DISK_HEALTH_CHECK_INTERVAL);
+    UserGroupInformation.setConfiguration(originalConf);
+    aclsManager = new ACLsManager(conf, new JobACLsManager(conf), null);
+    this.jobTrackAddr = JobTracker.getAddress(conf);
+    String infoAddr = 
+      NetUtils.getServerAddress(conf,
+                                "tasktracker.http.bindAddress", 
+                                "tasktracker.http.port",
+                                "mapred.task.tracker.http.address");
+    InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
+    String httpBindAddress = infoSocAddr.getHostName();
+    int httpPort = infoSocAddr.getPort();
+    this.server = new HttpServer("task", httpBindAddress, httpPort,
+        httpPort == 0, conf, aclsManager.getAdminsAcl());
+    workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    server.setThreads(1, workerThreads);
+    // let the jsp pages get to the task tracker, config, and other relevant
+    // objects
+    FileSystem local = FileSystem.getLocal(conf);
+    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    Class<? extends TaskController> taskControllerClass = 
+      conf.getClass("mapred.task.tracker.task-controller", 
+                     DefaultTaskController.class, TaskController.class);
+
+    fConf = new JobConf(conf);
+    localStorage = new LocalStorage(fConf.getLocalDirs());
+    localStorage.checkDirs();
+    taskController = 
+      (TaskController) ReflectionUtils.newInstance(taskControllerClass, fConf);
+    taskController.setup(localDirAllocator, localStorage);
+    lastNumFailures = localStorage.numFailures();
+
+    // create user log manager
+    setUserLogManager(new UserLogManager(conf, taskController));
+    SecurityUtil.login(originalConf, TT_KEYTAB_FILE, TT_USER_NAME);
+
+    initialize();
+    this.shuffleServerMetrics = ShuffleServerInstrumentation.create(this);
+    server.setAttribute("task.tracker", this);
+    server.setAttribute("local.file.system", local);
+
+    server.setAttribute("log", LOG);
+    server.setAttribute("localDirAllocator", localDirAllocator);
+    server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+
+    String exceptionStackRegex =
+      conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
+    String exceptionMsgRegex =
+      conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+
+    server.setAttribute("exceptionStackRegex", exceptionStackRegex);
+    server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+
+    server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
+    server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
+    server.start();
+    this.httpPort = server.getPort();
+    checkJettyPort(httpPort);
+    LOG.info("FILE_CACHE_SIZE for mapOutputServlet set to : " + FILE_CACHE_SIZE);
+    mapRetainSize = conf.getLong(TaskLogsTruncater.MAP_USERLOG_RETAIN_SIZE, 
+        TaskLogsTruncater.DEFAULT_RETAIN_SIZE);
+    reduceRetainSize = conf.getLong(TaskLogsTruncater.REDUCE_USERLOG_RETAIN_SIZE,
+        TaskLogsTruncater.DEFAULT_RETAIN_SIZE);
+  }
+
+  private void checkJettyPort(int port) throws IOException { 
+    //See HADOOP-4744
+    if (port < 0) {
+      shuttingDown = true;
+      throw new IOException("Jetty problem. Jetty didn't bind to a " +
+      		"valid port");
+    }
+  }
+  
+  private void startCleanupThreads() throws IOException {
+    taskCleanupThread.setDaemon(true);
+    taskCleanupThread.start();
+    directoryCleanupThread = CleanupQueue.getInstance();
+  }
+
+  // only used by tests
+  void setCleanupThread(CleanupQueue c) {
+    directoryCleanupThread = c;
+  }
+  
+  CleanupQueue getCleanupThread() {
+    return directoryCleanupThread;
+  }
+
+  /**
+   * The connection to the JobTracker, used by the TaskRunner 
+   * for locating remote files.
+   */
+  public InterTrackerProtocol getJobClient() {
+    return jobClient;
+  }
+        
+  /** Return the port at which the tasktracker bound to */
+  public synchronized InetSocketAddress getTaskTrackerReportAddress() {
+    return taskReportAddress;
+  }
+    
+  /** Queries the job tracker for a set of outputs ready to be copied
+   * @param fromEventId the first event ID we want to start from, this is
+   * modified by the call to this method
+   * @param jobClient the job tracker
+   * @return a set of locations to copy outputs from
+   * @throws IOException
+   */  
+  private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
+                                                    JobID jobId,
+                                                    InterTrackerProtocol jobClient)
+    throws IOException {
+
+    TaskCompletionEvent t[] = jobClient.getTaskCompletionEvents(
+                                                                jobId,
+                                                                fromEventId.get(),
+                                                                probe_sample_size);
+    //we are interested in map task completion events only. So store
+    //only those
+    List <TaskCompletionEvent> recentMapEvents = 
+      new ArrayList<TaskCompletionEvent>();
+    for (int i = 0; i < t.length; i++) {
+      if (t[i].isMap) {
+        recentMapEvents.add(t[i]);
+      }
+    }
+    fromEventId.set(fromEventId.get() + t.length);
+    return recentMapEvents;
+  }
+
+  /**
+   * Main service loop.  Will stay in this loop forever.
+   */
+  State offerService() throws Exception {
+    long lastHeartbeat = 0;
+
+    while (running && !shuttingDown) {
+      try {
+        long now = System.currentTimeMillis();
+
+        long waitTime = heartbeatInterval - (now - lastHeartbeat);
+        if (waitTime > 0) {
+          // sleeps for the wait time or 
+          // until there are empty slots to schedule tasks
+          synchronized (finishedCount) {
+            if (finishedCount.get() == 0) {
+              finishedCount.wait(waitTime);
+            }
+            finishedCount.set(0);
+          }
+        }
+
+        // If the TaskTracker is just starting up:
+        // 1. Verify the buildVersion
+        // 2. Get the system directory & filesystem
+        if(justInited) {
+          String jobTrackerBV = jobClient.getBuildVersion();
+          if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
+            String msg = "Shutting down. Incompatible buildVersion." +
+            "\nJobTracker's: " + jobTrackerBV + 
+            "\nTaskTracker's: "+ VersionInfo.getBuildVersion();
+            LOG.error(msg);
+            try {
+              jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
+            } catch(Exception e ) {
+              LOG.info("Problem reporting to jobtracker: " + e);
+            }
+            return State.DENIED;
+          }
+          
+          String dir = jobClient.getSystemDir();
+          if (dir == null) {
+            throw new IOException("Failed to get system directory");
+          }
+          systemDirectory = new Path(dir);
+          systemFS = systemDirectory.getFileSystem(fConf);
+        }
+
+        now = System.currentTimeMillis();
+        if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {
+          localStorage.checkDirs();
+          lastCheckDirsTime = now;
+          int numFailures = localStorage.numFailures();
+          // Re-init the task tracker if there were any new failures
+          if (numFailures > lastNumFailures) {
+            lastNumFailures = numFailures;
+            return State.STALE;
+          }
+        }
+
+        // Send the heartbeat and process the jobtracker's directives
+        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
+
+        // Note the time when the heartbeat returned, use this to decide when to send the
+        // next heartbeat   
+        lastHeartbeat = System.currentTimeMillis();
+        
+        // Check if the map-event list needs purging
+        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
+        if (jobs.size() > 0) {
+          synchronized (this) {
+            // purge the local map events list
+            for (JobID job : jobs) {
+              RunningJob rjob;
+              synchronized (runningJobs) {
+                rjob = runningJobs.get(job);          
+                if (rjob != null) {
+                  synchronized (rjob) {
+                    FetchStatus f = rjob.getFetchStatus();
+                    if (f != null) {
+                      f.reset();
+                    }
+                  }
+                }
+              }
+            }
+
+            // Mark the reducers in shuffle for rollback
+            synchronized (shouldReset) {
+              for (Map.Entry<TaskAttemptID, TaskInProgress> entry 
+                   : runningTasks.entrySet()) {
+                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
+                  this.shouldReset.add(entry.getKey());
+                }
+              }
+            }
+          }
+        }
+        
+        TaskTrackerAction[] actions = heartbeatResponse.getActions();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
+                    heartbeatResponse.getResponseId() + " and " + 
+                    ((actions != null) ? actions.length : 0) + " actions");
+        }
+        if (reinitTaskTracker(actions)) {
+          return State.STALE;
+        }
+            
+        // resetting heartbeat interval from the response.
+        heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
+        justStarted = false;
+        justInited = false;
+        if (actions != null){ 
+          for(TaskTrackerAction action: actions) {
+            if (action instanceof LaunchTaskAction) {
+              addToTaskQueue((LaunchTaskAction)action);
+            } else if (action instanceof CommitTaskAction) {
+              CommitTaskAction commitAction = (CommitTaskAction)action;
+              if (!commitResponses.contains(commitAction.getTaskID())) {
+                LOG.info("Received commit task action for " + 
+                          commitAction.getTaskID());
+                commitResponses.add(commitAction.getTaskID());
+              }
+            } else {
+              tasksToCleanup.put(action);
+            }
+          }
+        }
+        markUnresponsiveTasks();
+        killOverflowingTasks();
+            
+        //we've cleaned up, resume normal operation
+        if (!acceptNewTasks && isIdle()) {
+          acceptNewTasks=true;
+        }
+        //The check below may not be required every iteration but we are 
+        //erring on the side of caution here. We have seen many cases where
+        //the call to jetty's getLocalPort() returns different values at 
+        //different times. Being a real paranoid here.
+        checkJettyPort(server.getPort());
+      } catch (InterruptedException ie) {
+        LOG.info("Interrupted. Closing down.");
+        return State.INTERRUPTED;
+      } catch (DiskErrorException de) {
+        String msg = "Exiting task tracker for disk error:\n" +
+          StringUtils.stringifyException(de);
+        LOG.error(msg);
+        synchronized (this) {
+          jobClient.reportTaskTrackerError(taskTrackerName, 
+                                           "DiskErrorException", msg);
+        }
+        return State.STALE;
+      } catch (RemoteException re) {
+        String reClass = re.getClassName();
+        if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
+          LOG.info("Tasktracker disallowed by JobTracker.");
+          return State.DENIED;
+        }
+      } catch (Exception except) {
+        String msg = "Caught exception: " + 
+          StringUtils.stringifyException(except);
+        LOG.error(msg);
+      }
+    }
+
+    return State.NORMAL;
+  }
+
+  private long previousUpdate = 0;
+
+  void setIndexCache(IndexCache cache) {
+    this.indexCache = cache;
+  }
+
+  /**
+   * Build and transmit the heart beat to the JobTracker
+   * @param now current time
+   * @return false if the tracker was unknown
+   * @throws IOException
+   */
+  HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+    // Send Counters in the status once every COUNTER_UPDATE_INTERVAL
+    boolean sendCounters;
+    if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
+      sendCounters = true;
+      previousUpdate = now;
+    }
+    else {
+      sendCounters = false;
+    }
+
+    // 
+    // Check if the last heartbeat got through... 
+    // if so then build the heartbeat information for the JobTracker;
+    // else resend the previous status information.
+    //
+    if (status == null) {
+      synchronized (this) {
+        status = new TaskTrackerStatus(taskTrackerName, localHostname, 
+                                       httpPort, 
+                                       cloneAndResetRunningTaskStatuses(
+                                         sendCounters), 
+                                       failures, 
+                                       maxMapSlots,
+                                       maxReduceSlots); 
+      }
+    } else {
+      LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
+               "' with reponseId '" + heartbeatResponseId);
+    }
+      
+    //
+    // Check if we should ask for a new Task
+    //
+    boolean askForNewTask;
+    long localMinSpaceStart;
+    synchronized (this) {
+      askForNewTask = 
+        ((status.countOccupiedMapSlots() < maxMapSlots || 
+          status.countOccupiedReduceSlots() < maxReduceSlots) && 
+         acceptNewTasks); 
+      localMinSpaceStart = minSpaceStart;
+    }
+    if (askForNewTask) {
+      askForNewTask = enoughFreeSpace(localMinSpaceStart);
+      long freeDiskSpace = getFreeSpace();
+      long totVmem = getTotalVirtualMemoryOnTT();
+      long totPmem = getTotalPhysicalMemoryOnTT();
+
+      status.getResourceStatus().setAvailableSpace(freeDiskSpace);
+      status.getResourceStatus().setTotalVirtualMemory(totVmem);
+      status.getResourceStatus().setTotalPhysicalMemory(totPmem);
+      status.getResourceStatus().setMapSlotMemorySizeOnTT(
+          mapSlotMemorySizeOnTT);
+      status.getResourceStatus().setReduceSlotMemorySizeOnTT(
+          reduceSlotSizeMemoryOnTT);
+    }
+    //add node health information
+    
+    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+    synchronized (this) {
+      if (healthChecker != null) {
+        healthChecker.setHealthStatus(healthStatus);
+      } else {
+        healthStatus.setNodeHealthy(true);
+        healthStatus.setLastReported(0L);
+        healthStatus.setHealthReport("");
+      }
+    }
+    //
+    // Xmit the heartbeat
+    //
+    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
+                                                              justStarted,
+                                                              justInited,
+                                                              askForNewTask, 
+                                                              heartbeatResponseId);
+      
+    //
+    // The heartbeat got through successfully!
+    //
+    heartbeatResponseId = heartbeatResponse.getResponseId();
+      
+    synchronized (this) {
+      for (TaskStatus taskStatus : status.getTaskReports()) {
+        if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            !taskStatus.inTaskCleanupPhase()) {
+          if (taskStatus.getIsMap()) {
+            mapTotal--;
+          } else {
+            reduceTotal--;
+          }
+          myInstrumentation.completeTask(taskStatus.getTaskID());
+          runningTasks.remove(taskStatus.getTaskID());
+        }
+      }
+      
+      // Clear transient status information which should only
+      // be sent once to the JobTracker
+      for (TaskInProgress tip: runningTasks.values()) {
+        tip.getStatus().clearStatus();
+      }
+    }
+
+    // Force a rebuild of 'status' on the next iteration
+    status = null;                                
+
+    return heartbeatResponse;
+  }
+
+  /**
+   * Return the total virtual memory available on this TaskTracker.
+   * @return total size of virtual memory.
+   */
+  long getTotalVirtualMemoryOnTT() {
+    return totalVirtualMemoryOnTT;
+  }
+
+  /**
+   * Return the total physical memory available on this TaskTracker.
+   * @return total size of physical memory.
+   */
+  long getTotalPhysicalMemoryOnTT() {
+    return totalPhysicalMemoryOnTT;
+  }
+
+  long getTotalMemoryAllottedForTasksOnTT() {
+    return totalMemoryAllottedForTasks;
+  }
+
+  long getRetainSize(org.apache.hadoop.mapreduce.TaskAttemptID tid) {
+    return tid.isMap() ? mapRetainSize : reduceRetainSize;
+  }
+  
+  /**
+   * Check if the jobtracker directed a 'reset' of the tasktracker.
+   * 
+   * @param actions the directives of the jobtracker for the tasktracker.
+   * @return <code>true</code> if tasktracker is to be reset, 
+   *         <code>false</code> otherwise.
+   */
+  private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
+    if (actions != null) {
+      for (TaskTrackerAction action : actions) {
+        if (action.getActionId() == 
+            TaskTrackerAction.ActionType.REINIT_TRACKER) {
+          LOG.info("Recieved ReinitTrackerAction from JobTracker");
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+    
+  /**
+   * Kill any tasks that have not reported progress in the last X seconds.
+   */
+  private synchronized void markUnresponsiveTasks() throws IOException {
+    long now = System.currentTimeMillis();
+    for (TaskInProgress tip: runningTasks.values()) {
+      if (tip.getRunState() == TaskStatus.State.RUNNING ||
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          tip.isCleaningup()) {
+        // Check the per-job timeout interval for tasks;
+        // an interval of '0' implies it is never timed-out
+        long jobTaskTimeout = tip.getTaskTimeout();
+        if (jobTaskTimeout == 0) {
+          continue;
+        }
+          
+        // Check if the task has not reported progress for a 
+        // time-period greater than the configured time-out
+        long timeSinceLastReport = now - tip.getLastProgressReport();
+        if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
+          String msg = 
+            "Task " + tip.getTask().getTaskID() + " failed to report status for " 
+            + (timeSinceLastReport / 1000) + " seconds. Killing!";
+          LOG.info(tip.getTask().getTaskID() + ": " + msg);
+          ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
+          tip.reportDiagnosticInfo(msg);
+          myInstrumentation.timedoutTask(tip.getTask().getTaskID());
+          purgeTask(tip, true);
+        }
+      }
+    }
+  }
+
+  /**
+   * The task tracker is done with this job, so we need to clean up.
+   * @param action The action with the job
+   * @throws IOException
+   */
+  synchronized void purgeJob(KillJobAction action) throws IOException {
+    JobID jobId = action.getJobID();
+    LOG.info("Received 'KillJobAction' for job: " + jobId);
+    RunningJob rjob = null;
+    synchronized (runningJobs) {
+      rjob = runningJobs.get(jobId);
+    }
+      
+    if (rjob == null) {
+      LOG.warn("Unknown job " + jobId + " being deleted.");
+    } else {
+      synchronized (rjob) {
+        // decrement the reference counts for the items this job references
+        rjob.distCacheMgr.release();
+        // Add this tips of this job to queue of tasks to be purged 
+        for (TaskInProgress tip : rjob.tasks) {
+          tip.jobHasFinished(false);
+          Task t = tip.getTask();
+          if (t.isMapTask()) {
+            indexCache.removeMap(tip.getTask().getTaskID().toString());
+          }
+        }
+        // Delete the job directory for this  
+        // task if the job is done/failed
+        if (!rjob.keepJobFiles) {
+          removeJobFiles(rjob.ugi.getShortUserName(), rjob.getJobID());
+        }
+        // add job to user log manager
+        long now = System.currentTimeMillis();
+        JobCompletedEvent jca = new JobCompletedEvent(rjob
+            .getJobID(), now, UserLogCleaner.getUserlogRetainHours(rjob
+            .getJobConf()));
+        getUserLogManager().addLogEvent(jca);
+
+        // Remove this job 
+        rjob.tasks.clear();
+        // Close all FileSystems for this job
+        try {
+          FileSystem.closeAllForUGI(rjob.getUGI());
+        } catch (IOException ie) {
+          LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
+              " while closing FileSystem for " + rjob.getUGI());
+        }
+      }
+    }
+
+    synchronized(runningJobs) {
+      runningJobs.remove(jobId);
+    }
+    getJobTokenSecretManager().removeTokenForJob(jobId.toString());  
+  }      
+    
+  /**
+   * This job's files are no longer needed on this TT, remove them.
+   *
+   * @param rjob
+   * @throws IOException
+   */
+  void removeJobFiles(String user, JobID jobId) throws IOException {
+    String userDir = getUserDir(user);
+    String jobDir = getLocalJobDir(user, jobId.toString());
+    PathDeletionContext jobCleanup = 
+      new TaskController.DeletionContext(getTaskController(), false, user, 
+                                         jobDir.substring(userDir.length()));
+    directoryCleanupThread.addToQueue(jobCleanup);
+    
+    for (String str : localStorage.getDirs()) {
+      Path ttPrivateJobDir = FileSystem.getLocal(fConf).makeQualified(
+        new Path(str, TaskTracker.getPrivateDirForJob(user, jobId.toString())));
+      PathDeletionContext ttPrivateJobCleanup =
+        new CleanupQueue.PathDeletionContext(ttPrivateJobDir, fConf);
+      directoryCleanupThread.addToQueue(ttPrivateJobCleanup);
+    }
+  }
+
+  /**
+   * Remove the tip and update all relevant state.
+   * 
+   * @param tip {@link TaskInProgress} to be removed.
+   * @param wasFailure did the task fail or was it killed?
+   */
+  private void purgeTask(TaskInProgress tip, boolean wasFailure) 
+  throws IOException {
+    if (tip != null) {
+      LOG.info("About to purge task: " + tip.getTask().getTaskID());
+        
+      // Remove the task from running jobs, 
+      // removing the job if it's the last task
+      removeTaskFromJob(tip.getTask().getJobID(), tip);
+      tip.jobHasFinished(wasFailure);
+      if (tip.getTask().isMapTask()) {
+        indexCache.removeMap(tip.getTask().getTaskID().toString());
+      }
+    }
+  }
+
+  /** Check if we're dangerously low on disk space
+   * If so, kill jobs to free up space and make sure
+   * we don't accept any new tasks
+   * Try killing the reduce jobs first, since I believe they
+   * use up most space
+   * Then pick the one with least progress
+   */
+  private void killOverflowingTasks() throws IOException {
+    long localMinSpaceKill;
+    synchronized(this){
+      localMinSpaceKill = minSpaceKill;  
+    }
+    if (!enoughFreeSpace(localMinSpaceKill)) {
+      acceptNewTasks=false; 
+      //we give up! do not accept new tasks until
+      //all the ones running have finished and they're all cleared up
+      synchronized (this) {
+        TaskInProgress killMe = findTaskToKill(null);
+
+        if (killMe!=null) {
+          String msg = "Tasktracker running out of space." +
+            " Killing task.";
+          LOG.info(killMe.getTask().getTaskID() + ": " + msg);
+          killMe.reportDiagnosticInfo(msg);
+          purgeTask(killMe, false);
+        }
+      }
+    }
+  }
+
+  /**
+   * Pick a task to kill to free up memory/disk-space 
+   * @param tasksToExclude tasks that are to be excluded while trying to find a
+   *          task to kill. If null, all runningTasks will be searched.
+   * @return the task to kill or null, if one wasn't found
+   */
+  synchronized TaskInProgress findTaskToKill(List<TaskAttemptID> tasksToExclude) {
+    TaskInProgress killMe = null;
+    for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
+      TaskInProgress tip = (TaskInProgress) it.next();
+
+      if (tasksToExclude != null
+          && tasksToExclude.contains(tip.getTask().getTaskID())) {
+        // exclude this task
+        continue;
+      }
+
+      if ((tip.getRunState() == TaskStatus.State.RUNNING ||
+           tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
+          !tip.wasKilled) {
+                
+        if (killMe == null) {
+          killMe = tip;
+
+        } else if (!tip.getTask().isMapTask()) {
+          //reduce task, give priority
+          if (killMe.getTask().isMapTask() || 
+              (tip.getTask().getProgress().get() < 
+               killMe.getTask().getProgress().get())) {
+
+            killMe = tip;
+          }
+
+        } else if (killMe.getTask().isMapTask() &&
+                   tip.getTask().getProgress().get() < 
+                   killMe.getTask().getProgress().get()) {
+          //map task, only add if the progress is lower
+
+          killMe = tip;
+        }
+      }
+    }
+    return killMe;
+  }
+
+  /**
+   * Check if any of the local directories has enough
+   * free space  (more than minSpace)
+   * 
+   * If not, do not try to get a new task assigned 
+   * @return
+   * @throws IOException 
+   */
+  private boolean enoughFreeSpace(long minSpace) throws IOException {
+    if (minSpace == 0) {
+      return true;
+    }
+    return minSpace < getFreeSpace();
+  }
+  
+  private long getFreeSpace() throws IOException {
+    long biggestSeenSoFar = 0;
+    String[] localDirs = localStorage.getDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      DF df = null;
+      if (localDirsDf.containsKey(localDirs[i])) {
+        df = localDirsDf.get(localDirs[i]);
+      } else {
+        df = new DF(new File(localDirs[i]), fConf);
+        localDirsDf.put(localDirs[i], df);
+      }
+
+      long availOnThisVol = df.getAvailable();
+      if (availOnThisVol > biggestSeenSoFar) {
+        biggestSeenSoFar = availOnThisVol;
+      }
+    }
+    
+    //Should ultimately hold back the space we expect running tasks to use but 
+    //that estimate isn't currently being passed down to the TaskTrackers    
+    return biggestSeenSoFar;
+  }
+    
+  private TaskLauncher mapLauncher;
+  private TaskLauncher reduceLauncher;
+  public JvmManager getJvmManagerInstance() {
+    return jvmManager;
+  }
+
+  // called from unit test  
+  void setJvmManagerInstance(JvmManager jvmManager) {
+    this.jvmManager = jvmManager;
+  }
+
+  private void addToTaskQueue(LaunchTaskAction action) {
+    if (action.getTask().isMapTask()) {
+      mapLauncher.addToTaskQueue(action);
+    } else {
+      reduceLauncher.addToTaskQueue(action);
+    }
+  }
+  
+  class TaskLauncher extends Thread {
+    private IntWritable numFreeSlots;
+    private final int maxSlots;
+    private List<TaskInProgress> tasksToLaunch;
+
+    public TaskLauncher(TaskType taskType, int numSlots) {
+      this.maxSlots = numSlots;
+      this.numFreeSlots = new IntWritable(numSlots);
+      this.tasksToLaunch = new LinkedList<TaskInProgress>();
+      setDaemon(true);
+      setName("TaskLauncher for " + taskType + " tasks");
+    }
+
+    public void addToTaskQueue(LaunchTaskAction action) {
+      synchronized (tasksToLaunch) {
+        TaskInProgress tip = registerTask(action, this);
+        tasksToLaunch.add(tip);
+        tasksToLaunch.notifyAll();
+      }
+    }
+    
+    public void cleanTaskQueue() {
+      tasksToLaunch.clear();
+    }
+    
+    public void addFreeSlots(int numSlots) {
+      synchronized (numFreeSlots) {
+        numFreeSlots.set(numFreeSlots.get() + numSlots);
+        assert (numFreeSlots.get() <= maxSlots);
+        LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
+        numFreeSlots.notifyAll();
+      }
+    }
+    
+    void notifySlots() {
+      synchronized (numFreeSlots) {
+        numFreeSlots.notifyAll();
+      }
+    }
+
+    int getNumWaitingTasksToLaunch() {
+      synchronized (tasksToLaunch) {
+        return tasksToLaunch.size();
+      }
+    }
+
+    public void run() {
+      while (!Thread.interrupted()) {
+        try {
+          TaskInProgress tip;
+          Task task;
+          synchronized (tasksToLaunch) {
+            while (tasksToLaunch.isEmpty()) {
+              tasksToLaunch.wait();
+            }
+            //get the TIP
+            tip = tasksToLaunch.remove(0);
+            task = tip.getTask();
+            LOG.info("Trying to launch : " + tip.getTask().getTaskID() + 
+                     " which needs " + task.getNumSlotsRequired() + " slots");
+          }
+          //wait for free slots to run
+          synchronized (numFreeSlots) {
+            boolean canLaunch = true;
+            while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+              //Make sure that there is no kill task action for this task!
+              //We are not locking tip here, because it would reverse the
+              //locking order!
+              //Also, Lock for the tip is not required here! because :
+              // 1. runState of TaskStatus is volatile
+              // 2. Any notification is not missed because notification is
+              // synchronized on numFreeSlots. So, while we are doing the check,
+              // if the tip is half way through the kill(), we don't miss
+              // notification for the following wait().
+              if (!tip.canBeLaunched()) {
+                //got killed externally while still in the launcher queue
+                LOG.info("Not blocking slots for " + task.getTaskID()
+                    + " as it got killed externally. Task's state is "
+                    + tip.getRunState());
+                canLaunch = false;
+                break;
+              }
+              LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + 
+                       " to launch " + task.getTaskID() + ", currently we have " + 
+                       numFreeSlots.get() + " free slots");
+              numFreeSlots.wait();
+            }
+            if (!canLaunch) {
+              continue;
+            }
+            LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
+                     " and trying to launch "+tip.getTask().getTaskID() + 
+                     " which needs " + task.getNumSlotsRequired() + " slots");
+            numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());
+            assert (numFreeSlots.get() >= 0);
+          }
+          synchronized (tip) {
+            //to make sure that there is no kill task action for this
+            if (!tip.canBeLaunched()) {
+              //got killed externally while still in the launcher queue
+              LOG.info("Not launching task " + task.getTaskID() + " as it got"
+                + " killed externally. Task's state is " + tip.getRunState());
+              addFreeSlots(task.getNumSlotsRequired());
+              continue;
+            }
+            tip.slotTaken = true;
+          }
+          //got a free slot. launch the task
+          startNewTask(tip);
+        } catch (InterruptedException e) { 
+          return; // ALL DONE
+        } catch (Throwable th) {
+          LOG.error("TaskLauncher error " + 
+              StringUtils.stringifyException(th));
+        }
+      }
+    }
+  }
+  private TaskInProgress registerTask(LaunchTaskAction action, 
+      TaskLauncher launcher) {
+    Task t = action.getTask();
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+             " task's state:" + t.getState());
+    TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
+    synchronized (this) {
+      tasks.put(t.getTaskID(), tip);
+      runningTasks.put(t.getTaskID(), tip);
+      boolean isMap = t.isMapTask();
+      if (isMap) {
+        mapTotal++;
+      } else {
+        reduceTotal++;
+      }
+    }
+    return tip;
+  }
+
+  /**
+   * Start a new task.
+   * All exceptions are handled locally, so that we don't mess up the
+   * task tracker.
+   * @throws InterruptedException 
+   */
+  void startNewTask(final TaskInProgress tip) throws InterruptedException {
+    Thread launchThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          RunningJob rjob = localizeJob(tip);
+          tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString());
+          // Localization is done. Neither rjob.jobConf nor rjob.ugi can be null
+          launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); 
+        } catch (Throwable e) {
+          String msg = ("Error initializing " + tip.getTask().getTaskID() + 
+                        ":\n" + StringUtils.stringifyException(e));
+          LOG.warn(msg);
+          tip.reportDiagnosticInfo(msg);
+          try {
+            tip.kill(true);
+            tip.cleanup(true);
+          } catch (IOException ie2) {
+            LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+          } catch (InterruptedException ie2) {
+            LOG.info("Error cleaning up " + tip.getTask().getTaskID(), ie2);
+          }
+          if (e instanceof Error) {
+            LOG.error("TaskLauncher error " + 
+                StringUtils.stringifyException(e));
+          }
+        }
+      }
+    });
+    launchThread.start();
+  }
+
+  void addToMemoryManager(TaskAttemptID attemptId, boolean isMap,
+                          JobConf conf) {
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.addTask(attemptId, 
+          isMap ? conf
+              .getMemoryForMapTask() * 1024 * 1024L : conf
+              .getMemoryForReduceTask() * 1024 * 1024L);
+    }
+  }
+
+  void removeFromMemoryManager(TaskAttemptID attemptId) {
+    // Remove the entry from taskMemoryManagerThread's data structures.
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.removeTask(attemptId);
+    }
+  }
+
+  /** 
+   * Notify the tasktracker to send an out-of-band heartbeat.
+   */
+  private void notifyTTAboutTaskCompletion() {
+    if (oobHeartbeatOnTaskCompletion) {
+      synchronized (finishedCount) {
+        int value = finishedCount.get();
+        finishedCount.set(value+1);
+        finishedCount.notify();
+      }
+    }
+  }
+  
+  /**
+   * The server retry loop.  
+   * This while-loop attempts to connect to the JobTracker.  It only 
+   * loops when the old TaskTracker has gone bad (its state is
+   * stale somehow) and we need to reinitialize everything.
+   */
+  public void run() {
+    try {
+      getUserLogManager().start();
+      startCleanupThreads();
+      boolean denied = false;
+      while (running && !shuttingDown && !denied) {
+        boolean staleState = false;
+        try {
+          // This while-loop attempts reconnects if we get network errors
+          while (running && !staleState && !shuttingDown && !denied) {
+            try {
+              State osState = offerService();
+              if (osState == State.STALE) {
+                staleState = true;
+              } else if (osState == State.DENIED) {
+                denied = true;
+              }
+            } catch (Exception ex) {
+              if (!shuttingDown) {
+                LOG.info("Lost connection to JobTracker [" +
+                         jobTrackAddr + "].  Retrying...", ex);
+                try {
+                  Thread.sleep(5000);
+                } catch (InterruptedException ie) {
+                }
+              }
+            }
+          }
+        } finally {
+          close();
+        }
+        if (shuttingDown) { return; }
+        LOG.warn("Reinitializing local state");
+        initialize();
+      }
+      if (denied) {
+        shutdown();
+      }
+    } catch (IOException iex) {
+      LOG.error("Got fatal exception while reinitializing TaskTracker: " +
+                StringUtils.stringifyException(iex));
+      return;
+    }
+    catch (InterruptedException i) {
+      LOG.error("Got interrupted while reinitializing TaskTracker: " +
+          i.getMessage());
+      return;
+    }
+  }
+    
+  ///////////////////////////////////////////////////////
+  // TaskInProgress maintains all the info for a Task that
+  // lives at this TaskTracker.  It maintains the Task object,
+  // its TaskStatus, and the TaskRunner.
+  ///////////////////////////////////////////////////////
+  class TaskInProgress {
+    Task task;
+    long lastProgressReport;
+    StringBuffer diagnosticInfo = new StringBuffer();
+    private TaskRunner runner;
+    volatile boolean done = false;
+    volatile boolean wasKilled = false;
+    private JobConf ttConf;
+    private JobConf localJobConf;
+    private boolean keepFailedTaskFiles;
+    private boolean alwaysKeepTaskFiles;
+    private TaskStatus taskStatus; 
+    private long taskTimeout;
+    private String debugCommand;
+    private volatile boolean slotTaken = false;
+    private TaskLauncher launcher;
+
+    // The ugi of the user who is running the job. This contains all the tokens
+    // too which will be populated during job-localization
+    private UserGroupInformation ugi;
+
+    UserGroupInformation getUGI() {
+      return ugi;
+    }
+
+    void setUGI(UserGroupInformation userUGI) {
+      ugi = userUGI;
+    }
+
+    /**
+     */
+    public TaskInProgress(Task task, JobConf conf) {
+      this(task, conf, null);
+    }
+    
+    public TaskInProgress(Task task, JobConf conf, TaskLauncher launcher) {
+      this.task = task;
+      this.launcher = launcher;
+      this.lastProgressReport = System.currentTimeMillis();
+      this.ttConf = conf;
+      localJobConf = null;
+      taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
+                                               0.0f, 
+                                               task.getNumSlotsRequired(),
+                                               task.getState(),
+                                               diagnosticInfo.toString(), 
+                                               "initializing",  
+                                               getName(), 
+                                               task.isTaskCleanupTask() ? 
+                                                 TaskStatus.Phase.CLEANUP :  
+                                               task.isMapTask()? TaskStatus.Phase.MAP:
+                                               TaskStatus.Phase.SHUFFLE,
+                                               task.getCounters()); 
+      taskTimeout = (10 * 60 * 1000);
+    }
+        
+    void localizeTask(Task task) throws IOException{
+
+      // Do the task-type specific localization
+//TODO: are these calls really required
+      task.localizeConfiguration(localJobConf);
+      
+      task.setConf(localJobConf);
+    }
+        
+    /**
+     */
+    public Task getTask() {
+      return task;
+    }
+    
+    TaskRunner getTaskRunner() {
+      return runner;
+    }
+
+    void setTaskRunner(TaskRunner rnr) {
+      this.runner = rnr;
+    }
+
+    public synchronized void setJobConf(JobConf lconf){
+      this.localJobConf = lconf;
+      keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+      taskTimeout = localJobConf.getLong("mapred.task.timeout", 
+                                         10 * 60 * 1000);
+      if (task.isMapTask()) {
+        debugCommand = localJobConf.getMapDebugScript();
+      } else {

[... 1760 lines stripped ...]