You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [23/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java Sat Nov 28 20:26:01 2009
@@ -17,10 +17,10 @@
  */
  package org.apache.hadoop.mapred;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -36,6 +36,7 @@
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.StringTokenizer;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
@@ -64,12 +65,19 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskController.DebugScriptContext;
 import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.mapreduce.security.JobTokens;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -82,8 +90,9 @@
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
+import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.Service;
@@ -100,7 +109,7 @@
  *
  *******************************************************/
 public class TaskTracker extends Service
-             implements MRConstants, TaskUmbilicalProtocol, Runnable {
+    implements MRConstants, TaskUmbilicalProtocol, Runnable, TTConfig {
   /**
    * @deprecated
    */
@@ -120,21 +129,20 @@
   static enum State {NORMAL, STALE, INTERRUPTED, DENIED}
 
   static{
-    Configuration.addDefaultResource("mapred-default.xml");
-    Configuration.addDefaultResource("mapred-site.xml");
+    ConfigUtil.loadResources();
   }
 
   public static final Log LOG =
     LogFactory.getLog(TaskTracker.class);
 
   public static final String MR_CLIENTTRACE_FORMAT =
-        "src: %s" +     // src IP
-        ", dest: %s" +  // dst IP
-        ", bytes: %s" + // byte count
-        ", op: %s" +    // operation
-        ", cliID: %s" + // task id
-        ", reduceID: %s" + // reduce id
-        ", duration: %s"; // duration
+    "src: %s" +     // src IP
+    ", dest: %s" +  // dst IP
+    ", maps: %s" + // number of maps
+    ", op: %s" +    // operation
+    ", reduceID: %s" + // reduce id
+    ", duration: %s"; // duration
+
   public static final Log ClientTraceLog =
     LogFactory.getLog(TaskTracker.class.getName() + ".clienttrace");
 
@@ -152,7 +160,7 @@
 
   Server taskReportServer = null;
   InterTrackerProtocol jobClient;
-  
+
   private TrackerDistributedCacheManager distributedCacheManager;
     
   // last heartbeat response recieved
@@ -185,7 +193,8 @@
    * Map from taskId -> TaskInProgress.
    */
   Map<TaskAttemptID, TaskInProgress> runningTasks = null;
-  Map<JobID, RunningJob> runningJobs = null;
+  Map<JobID, RunningJob> runningJobs = new TreeMap<JobID, RunningJob>();
+
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
@@ -206,24 +215,36 @@
   //for serving map output to the other nodes
 
   static Random r = new Random();
-  static final String SUBDIR = "taskTracker";
-  private static final String DISTCACHEDIR = "distcache";
+  public static final String SUBDIR = "taskTracker";
+  static final String DISTCACHEDIR = "distcache";
   static final String JOBCACHE = "jobcache";
   static final String OUTPUT = "output";
   private static final String JARSDIR = "jars";
   static final String LOCAL_SPLIT_FILE = "split.dta";
   static final String JOBFILE = "job.xml";
+  static final String JOB_TOKEN_FILE="jobToken"; //localized file
 
-  static final String JOB_LOCAL_DIR = "job.local.dir";
+  static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR;
 
   private JobConf fConf;
-  private FileSystem localFs;
+  FileSystem localFs;
+
+  private Localizer localizer;
+
   private int maxMapSlots;
   private int maxReduceSlots;
   private int failures;
+  
+  // Performance-related config knob to send an out-of-band heartbeat
+  // on task completion
+  private volatile boolean oobHeartbeatOnTaskCompletion;
+  
+  // Track number of completed tasks to send an out-of-band heartbeat
+  private IntWritable finishedCount = new IntWritable(0);
+  
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
-  private CleanupQueue directoryCleanupThread;
+  CleanupQueue directoryCleanupThread;
   volatile JvmManager jvmManager;
   
   private TaskMemoryManagerThread taskMemoryManager;
@@ -235,7 +256,7 @@
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
-      "mapred.tasktracker.memory_calculator_plugin";
+      TT_MEMORY_CALCULATOR_PLUGIN;
 
   /**
    * the minimum interval between jobtracker polls
@@ -340,7 +361,7 @@
    */
   private TaskCleanupThread taskCleanupThread;
 
-  TaskController getTaskController() {
+  public TaskController getTaskController() {
     return taskController;
   }
   
@@ -377,68 +398,84 @@
     }
   }
 
-  static String getDistributedCacheDir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
+  Localizer getLocalizer() {
+    return localizer;
+  }
+
+  void setLocalizer(Localizer l) {
+    localizer = l;
+  }
+
+  public static String getUserDir(String user) {
+    return TaskTracker.SUBDIR + Path.SEPARATOR + user;
+  }
+
+  public static String getDistributedCacheDir(String user) {
+    return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR;
   }
 
-  static String getJobCacheSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+  public static String getJobCacheSubdir(String user) {
+    return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
 
-  static String getLocalJobDir(String jobid) {
-    return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+  public static String getLocalJobDir(String user, String jobid) {
+    return getJobCacheSubdir(user) + Path.SEPARATOR + jobid;
   }
 
-  static String getLocalJobConfFile(String jobid) {
-    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
+  static String getLocalJobConfFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
+  
+  static String getLocalJobTokenFile(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JOB_TOKEN_FILE;
+  }
+
 
-  static String getTaskConfFile(String jobid, String taskid,
+  static String getTaskConfFile(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    return getLocalTaskDir(jobid, taskid, isCleanupAttempt) + Path.SEPARATOR
-        + TaskTracker.JOBFILE;
+    return getLocalTaskDir(user, jobid, taskid, isCleanupAttempt)
+        + Path.SEPARATOR + TaskTracker.JOBFILE;
   }
 
-  static String getJobJarsDir(String jobid) {
-    return getLocalJobDir(jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
+  static String getJobJarsDir(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + TaskTracker.JARSDIR;
   }
 
-  static String getJobJarFile(String jobid) {
-    return getJobJarsDir(jobid) + Path.SEPARATOR + "job.jar";
+  static String getJobJarFile(String user, String jobid) {
+    return getJobJarsDir(user, jobid) + Path.SEPARATOR + "job.jar";
   }
 
-  static String getJobWorkDir(String jobid) {
-    return getLocalJobDir(jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
+  static String getJobWorkDir(String user, String jobid) {
+    return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR;
   }
 
-  static String getLocalSplitFile(String jobid, String taskid) {
-    return TaskTracker.getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+  static String getLocalSplitFile(String user, String jobid, String taskid) {
+    return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
         + TaskTracker.LOCAL_SPLIT_FILE;
   }
 
-  static String getIntermediateOutputDir(String jobid, String taskid) {
-    return getLocalTaskDir(jobid, taskid) + Path.SEPARATOR
+  static String getIntermediateOutputDir(String user, String jobid,
+      String taskid) {
+    return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR
         + TaskTracker.OUTPUT;
   }
 
-  static String getLocalTaskDir(String jobid, String taskid) {
-    return getLocalTaskDir(jobid, taskid, false);
+  static String getLocalTaskDir(String user, String jobid, String taskid) {
+    return getLocalTaskDir(user, jobid, taskid, false);
   }
 
-  static String getLocalTaskDir(String jobid, 
-                                String taskid, 
-                                boolean isCleanupAttempt) {
-	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
-	if (isCleanupAttempt) { 
+  public static String getLocalTaskDir(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String taskDir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
       taskDir = taskDir + TASK_CLEANUP_SUFFIX;
-	}
-	return taskDir;
+    }
+    return taskDir;
   }
 
-  static String getTaskWorkDir(String jobid, String taskid,
+  static String getTaskWorkDir(String user, String jobid, String taskid,
       boolean isCleanupAttempt) {
-    String dir =
-      getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+    String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid;
     if (isCleanupAttempt) {
       dir = dir + TASK_CLEANUP_SUFFIX;
     }
@@ -482,14 +519,14 @@
 
     localFs = FileSystem.getLocal(fConf);
     // use configured nameserver & interface to get local hostname
-    if (fConf.get("slave.host.name") != null) {
-      this.localHostname = fConf.get("slave.host.name");
+    if (fConf.get(TT_HOST_NAME) != null) {
+      this.localHostname = fConf.get(TT_HOST_NAME);
     }
     if (localHostname == null) {
       this.localHostname =
       DNS.getDefaultHost
-      (fConf.get("mapred.tasktracker.dns.interface","default"),
-       fConf.get("mapred.tasktracker.dns.nameserver","default"));
+      (fConf.get(TT_DNS_INTERFACE,"default"),
+       fConf.get(TT_DNS_NAMESERVER,"default"));
     }
  
     //check local disk
@@ -505,10 +542,11 @@
     this.acceptNewTasks = true;
     this.status = null;
 
-    this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
-    this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+    this.minSpaceStart = this.fConf.getLong(TT_LOCAL_DIR_MINSPACE_START, 0L);
+    this.minSpaceKill = this.fConf.getLong(TT_LOCAL_DIR_MINSPACE_KILL, 0L);
     //tweak the probe sample size (make it a function of numCopiers)
-    probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);
+    probe_sample_size = 
+      this.fConf.getInt(TT_MAX_TASK_COMPLETION_EVENTS_TO_POLL, 500);
     
     Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
     try {
@@ -524,7 +562,7 @@
     
     // bind address
     InetSocketAddress socAddr = NetUtils.createSocketAddr(
-        fConf.get("mapred.task.tracker.report.address", "127.0.0.1:0"));
+        fConf.get(TT_REPORT_ADDRESS, "127.0.0.1:0"));
     String bindAddress = socAddr.getHostName();
     int tmpPort = socAddr.getPort();
     
@@ -552,7 +590,7 @@
 
     // get the assigned address
     this.taskReportAddress = taskReportServer.getListenerAddress();
-    this.fConf.set("mapred.task.tracker.report.address",
+    this.fConf.set(TT_REPORT_ADDRESS,
         taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
     LOG.info("TaskTracker up at: " + this.taskReportAddress);
 
@@ -596,7 +634,7 @@
     mapLauncher.start();
     reduceLauncher.start();
     Class<? extends TaskController> taskControllerClass 
-                          = fConf.getClass("mapred.task.tracker.task-controller",
+                          = fConf.getClass(TT_TASK_CONTROLLER,
                                             DefaultTaskController.class, 
                                             TaskController.class); 
     taskController = (TaskController)ReflectionUtils.newInstance(
@@ -604,22 +642,28 @@
     
     //setup and create jobcache directory with appropriate permissions
     taskController.setup();
-    
+
+    // create a localizer instance
+    setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController));
+
     //Start up node health checker service.
     if (shouldStartHealthMonitor(this.fConf)) {
       startHealthMonitor(this.fConf);
     }
+    
+    oobHeartbeatOnTaskCompletion = 
+      fConf.getBoolean(TT_OUTOFBAND_HEARBEAT, false);
   }
 
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
     Configuration conf) {
-    return conf.getClass("mapred.tasktracker.instrumentation",
+    return conf.getClass(TT_INSTRUMENTATION,
         TaskTrackerMetricsInst.class, TaskTrackerInstrumentation.class);
   }
 
   public static void setInstrumentationClass(
     Configuration conf, Class<? extends TaskTrackerInstrumentation> t) {
-    conf.setClass("mapred.tasktracker.instrumentation",
+    conf.setClass(TT_INSTRUMENTATION,
         t, TaskTrackerInstrumentation.class);
   }
   
@@ -807,7 +851,7 @@
   }
 
   private static LocalDirAllocator lDirAlloc = 
-                              new LocalDirAllocator("mapred.local.dir");
+                              new LocalDirAllocator(MRConfig.LOCAL_DIR);
 
   // intialize the job directory
   private void localizeJob(TaskInProgress tip) throws IOException {
@@ -815,6 +859,9 @@
     JobID jobId = t.getJobID();
     RunningJob rjob = addTaskToJob(jobId, tip);
 
+    // Initialize the user directories if needed.
+    getLocalizer().initializeUserDirs(t.getUser());
+
     synchronized (rjob) {
       if (!rjob.localized) {
 
@@ -833,6 +880,12 @@
         rjob.jobConf = localJobConf;
         rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
                              localJobConf.getKeepFailedTaskFiles());
+        FSDataInputStream in = localFs.open(new Path(
+            rjob.jobConf.get(JobContext.JOB_TOKEN_FILE)));
+        JobTokens jt = new JobTokens();
+        jt.readFields(in); 
+        rjob.jobTokens = jt; // store JobToken object per job
+        
         rjob.localized = true;
       }
     }
@@ -858,21 +911,23 @@
   JobConf localizeJobFiles(Task t)
       throws IOException {
     JobID jobId = t.getJobID();
+    String userName = t.getUser();
 
-    // Initialize the job directories first
+    // Initialize the job directories
     FileSystem localFs = FileSystem.getLocal(fConf);
-    initializeJobDirs(jobId, localFs, fConf.getStrings("mapred.local.dir"));
+    getLocalizer().initializeJobDirs(userName, jobId);
 
     // Download the job.xml for this job from the system FS
-    Path localJobFile = localizeJobConfFile(new Path(t.getJobFile()), jobId);
+    Path localJobFile =
+        localizeJobConfFile(new Path(t.getJobFile()), userName, jobId);
 
     JobConf localJobConf = new JobConf(localJobFile);
 
     // create the 'job-work' directory: job-specific shared directory for use as
     // scratch space by all tasks of the same job running on this TaskTracker. 
     Path workDir =
-        lDirAlloc.getLocalPathForWrite(getJobWorkDir(jobId.toString()),
-            fConf);
+        lDirAlloc.getLocalPathForWrite(getJobWorkDir(userName, jobId
+            .toString()), fConf);
     if (!localFs.mkdirs(workDir)) {
       throw new IOException("Mkdirs failed to create " 
                   + workDir.toString());
@@ -881,153 +936,12 @@
     localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath());
 
     // Download the job.jar for this job from the system FS
-    localizeJobJarFile(jobId, localFs, localJobConf);
+    localizeJobJarFile(userName, jobId, localFs, localJobConf);
+    // save local copy of JobToken file
+    localizeJobTokenFile(userName, jobId, localJobConf);
     return localJobConf;
   }
 
-  static class PermissionsHandler {
-    /**
-     * Permission information useful for setting permissions for a given path.
-     * Using this, one can set all possible combinations of permissions for the
-     * owner of the file. But permissions for the group and all others can only
-     * be set together, i.e. permissions for group cannot be set different from
-     * those for others and vice versa.
-     */
-    static class PermissionsInfo {
-      public boolean readPermissions;
-      public boolean writePermissions;
-      public boolean executablePermissions;
-      public boolean readPermsOwnerOnly;
-      public boolean writePermsOwnerOnly;
-      public boolean executePermsOwnerOnly;
-
-      /**
-       * Create a permissions-info object with the given attributes
-       * 
-       * @param readPerms
-       * @param writePerms
-       * @param executePerms
-       * @param readOwnerOnly
-       * @param writeOwnerOnly
-       * @param executeOwnerOnly
-       */
-      public PermissionsInfo(boolean readPerms, boolean writePerms,
-          boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
-          boolean executeOwnerOnly) {
-        readPermissions = readPerms;
-        writePermissions = writePerms;
-        executablePermissions = executePerms;
-        readPermsOwnerOnly = readOwnerOnly;
-        writePermsOwnerOnly = writeOwnerOnly;
-        executePermsOwnerOnly = executeOwnerOnly;
-      }
-    }
-
-    /**
-     * Set permission on the given file path using the specified permissions
-     * information. We use java api to set permission instead of spawning chmod
-     * processes. This saves a lot of time. Using this, one can set all possible
-     * combinations of permissions for the owner of the file. But permissions
-     * for the group and all others can only be set together, i.e. permissions
-     * for group cannot be set different from those for others and vice versa.
-     * 
-     * This method should satisfy the needs of most of the applications. For
-     * those it doesn't, {@link FileUtil#chmod} can be used.
-     * 
-     * @param f file path
-     * @param pInfo permissions information
-     * @return true if success, false otherwise
-     */
-    static boolean setPermissions(File f, PermissionsInfo pInfo) {
-      if (pInfo == null) {
-        LOG.debug(" PermissionsInfo is null, returning.");
-        return true;
-      }
-
-      LOG.debug("Setting permission for " + f.getAbsolutePath());
-
-      boolean ret = true;
-
-      // Clear all the flags
-      ret = f.setReadable(false, false) && ret;
-      ret = f.setWritable(false, false) && ret;
-      ret = f.setExecutable(false, false) && ret;
-
-      ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
-      LOG.debug("Readable status for " + f + " set to " + ret);
-      ret =
-          f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
-              && ret;
-      LOG.debug("Writable status for " + f + " set to " + ret);
-      ret =
-          f.setExecutable(pInfo.executablePermissions,
-              pInfo.executePermsOwnerOnly)
-              && ret;
-
-      LOG.debug("Executable status for " + f + " set to " + ret);
-      return ret;
-    }
-
-    /**
-     * Permissions rwxr_xr_x
-     */
-    static PermissionsInfo sevenFiveFive =
-        new PermissionsInfo(true, true, true, false, true, false);
-    /**
-     * Completely private permissions
-     */
-    static PermissionsInfo sevenZeroZero =
-        new PermissionsInfo(true, true, true, true, true, true);
-  }
-
-  /**
-   * Prepare the job directories for a given job. To be called by the job
-   * localization code, only if the job is not already localized.
-   * 
-   * <br>
-   * Here, we set 700 permissions on the job directories created on all disks.
-   * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
-   * later time to set proper private permissions on the job directories. <br>
-   * 
-   * @param jobId
-   * @param fs
-   * @param localDirs
-   * @throws IOException
-   */
-  private static void initializeJobDirs(JobID jobId, FileSystem fs,
-      String[] localDirs)
-      throws IOException {
-    boolean initJobDirStatus = false;
-    String jobDirPath = getLocalJobDir(jobId.toString());
-    for (String localDir : localDirs) {
-      Path jobDir = new Path(localDir, jobDirPath);
-      if (fs.exists(jobDir)) {
-        // this will happen on a partial execution of localizeJob. Sometimes
-        // copying job.xml to the local disk succeeds but copying job.jar might
-        // throw out an exception. We should clean up and then try again.
-        fs.delete(jobDir, true);
-      }
-
-      boolean jobDirStatus = fs.mkdirs(jobDir);
-      if (!jobDirStatus) {
-        LOG.warn("Not able to create job directory " + jobDir.toString());
-      }
-
-      initJobDirStatus = initJobDirStatus || jobDirStatus;
-
-      // job-dir has to be private to the TT
-      PermissionsHandler.setPermissions(new File(jobDir.toUri().getPath()),
-          PermissionsHandler.sevenZeroZero);
-    }
-
-    if (!initJobDirStatus) {
-      throw new IOException("Not able to initialize job directories "
-          + "in any of the configured local directories for job "
-          + jobId.toString());
-    }
-  }
-
   /**
    * Download the job configuration file from the FS.
    * 
@@ -1036,7 +950,7 @@
    * @return the local file system path of the downloaded file.
    * @throws IOException
    */
-  private Path localizeJobConfFile(Path jobFile, JobID jobId)
+  private Path localizeJobConfFile(Path jobFile, String user, JobID jobId)
       throws IOException {
     // Get sizes of JobFile and JarFile
     // sizes are -1 if they are not present.
@@ -1050,7 +964,7 @@
     }
 
     Path localJobFile =
-        lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(jobId.toString()),
+        lDirAlloc.getLocalPathForWrite(getLocalJobConfFile(user, jobId.toString()),
             jobFileSize, fConf);
 
     // Download job.xml
@@ -1067,7 +981,7 @@
    * @param localJobConf
    * @throws IOException
    */
-  private void localizeJobJarFile(JobID jobId, FileSystem localFs,
+  private void localizeJobJarFile(String user, JobID jobId, FileSystem localFs,
       JobConf localJobConf)
       throws IOException {
     // copy Jar file to the local FS and unjar it.
@@ -1082,11 +996,11 @@
       } catch (FileNotFoundException fe) {
         jarFileSize = -1;
       }
-      // Here we check for and we check five times the size of jarFileSize
-      // to accommodate for unjarring the jar file in userfiles directory
+      // Here we check for five times the size of jarFileSize to accommodate for
+      // unjarring the jar file in the jars directory
       Path localJarFile =
-          lDirAlloc.getLocalPathForWrite(getJobJarFile(jobId.toString()),
-              5 * jarFileSize, fConf);
+          lDirAlloc.getLocalPathForWrite(
+              getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf);
 
       // Download job.jar
       systemFS.copyToLocalFile(jarFilePath, localJarFile);
@@ -1100,45 +1014,6 @@
     }
   }
 
-  /**
-   * Create taskDirs on all the disks. Otherwise, in some cases, like when
-   * LinuxTaskController is in use, child might wish to balance load across
-   * disks but cannot itself create attempt directory because of the fact that
-   * job directory is writable only by the TT.
-   * 
-   * @param jobId
-   * @param attemptId
-   * @param isCleanupAttempt
-   * @param fs
-   * @param localDirs
-   * @throws IOException
-   */
-  private static void initializeAttemptDirs(String jobId, String attemptId,
-      boolean isCleanupAttempt, FileSystem fs, String[] localDirs)
-      throws IOException {
-
-    boolean initStatus = false;
-    String attemptDirPath =
-        getLocalTaskDir(jobId, attemptId, isCleanupAttempt);
-
-    for (String localDir : localDirs) {
-      Path localAttemptDir = new Path(localDir, attemptDirPath);
-
-      boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
-      if (!attemptDirStatus) {
-        LOG.warn("localAttemptDir " + localAttemptDir.toString()
-            + " couldn't be created.");
-      }
-      initStatus = initStatus || attemptDirStatus;
-    }
-
-    if (!initStatus) {
-      throw new IOException("Not able to initialize attempt directories "
-          + "in any of the configured local directories for the attempt "
-          + attemptId.toString());
-    }
-  }
-
   private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
     synchronized (tip) {
       tip.setJobConf(jobConf);
@@ -1205,7 +1080,7 @@
     }
 
     this.running = false;
-        
+
     // Clear local storage
     cleanupStorage();
         
@@ -1288,7 +1163,7 @@
   protected TaskTracker(JobConf conf, boolean start) throws IOException {
     super(conf);
     fConf = conf;
-    //for backwards compatibility, the task tracker starts up unless told not 
+    //for backwards compatibility, the task tracker starts up unless told not
     //to. Subclasses should be very cautious about having their superclass    
     //do that as subclassed methods can be invoked before the class is fully  
     //configured                                                              
@@ -1308,22 +1183,22 @@
           throws IOException, InterruptedException {
     JobConf conf = fConf;
     fConf = conf;
-    maxMapSlots = conf.getInt("mapred.tasktracker.map.tasks.maximum", 2);
-    maxReduceSlots = conf.getInt("mapred.tasktracker.reduce.tasks.maximum", 2);
+    maxMapSlots = conf.getInt(TT_MAP_SLOTS, 2);
+    maxReduceSlots = conf.getInt(TT_REDUCE_SLOTS, 2);
     this.jobTrackAddr = JobTracker.getAddress(conf);
     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(
-        conf.get("mapred.task.tracker.http.address", "0.0.0.0:50060"));
+        conf.get(TT_HTTP_ADDRESS, "0.0.0.0:50060"));
     String httpBindAddress = infoSocAddr.getHostName();
     int httpPort = infoSocAddr.getPort();
     this.server = new HttpServer("task", httpBindAddress, httpPort,
         httpPort == 0, conf);
-    workerThreads = conf.getInt("tasktracker.http.threads", 40);
+    workerThreads = conf.getInt(TT_HTTP_THREADS, 40);
     this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
     server.setThreads(1, workerThreads);
     // let the jsp pages get to the task tracker, config, and other relevant
     // objects
     FileSystem local = FileSystem.getLocal(conf);
-    this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
+    this.localDirAllocator = new LocalDirAllocator(MRConfig.LOCAL_DIR);
     server.setAttribute("task.tracker", this);
     server.setAttribute("local.file.system", local);
     server.setAttribute("conf", conf);
@@ -1398,7 +1273,7 @@
     List <TaskCompletionEvent> recentMapEvents = 
       new ArrayList<TaskCompletionEvent>();
     for (int i = 0; i < t.length; i++) {
-      if (t[i].isMap) {
+      if (t[i].isMapTask()) {
         recentMapEvents.add(t[i]);
       }
     }
@@ -1419,8 +1294,14 @@
 
         long waitTime = heartbeatInterval - (now - lastHeartbeat);
         if (waitTime > 0) {
-          // sleeps for the wait time
-          Thread.sleep(waitTime);
+          // sleeps for the wait time or
+          // until there are empty slots to schedule tasks
+          synchronized (finishedCount) {
+            if (finishedCount.get() == 0) {
+              finishedCount.wait(waitTime);
+            }
+            finishedCount.set(0);
+          }
         }
 
         // If the TaskTracker is just starting up:
@@ -1461,39 +1342,6 @@
         // next heartbeat   
         lastHeartbeat = System.currentTimeMillis();
         
-        
-        // Check if the map-event list needs purging
-        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
-        if (jobs.size() > 0) {
-          synchronized (this) {
-            // purge the local map events list
-            for (JobID job : jobs) {
-              RunningJob rjob;
-              synchronized (runningJobs) {
-                rjob = runningJobs.get(job);          
-                if (rjob != null) {
-                  synchronized (rjob) {
-                    FetchStatus f = rjob.getFetchStatus();
-                    if (f != null) {
-                      f.reset();
-                    }
-                  }
-                }
-              }
-            }
-
-            // Mark the reducers in shuffle for rollback
-            synchronized (shouldReset) {
-              for (Map.Entry<TaskAttemptID, TaskInProgress> entry 
-                   : runningTasks.entrySet()) {
-                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
-                  this.shouldReset.add(entry.getKey());
-                }
-              }
-            }
-          }
-        }
-        
         TaskTrackerAction[] actions = heartbeatResponse.getActions();
         if(LOG.isDebugEnabled()) {
           LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + 
@@ -1798,9 +1646,8 @@
         }
         // Delete the job directory for this  
         // task if the job is done/failed
-        if (!rjob.keepJobFiles){
-          directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, 
-            getLocalJobDir(rjob.getJobID().toString())));
+        if (!rjob.keepJobFiles) {
+          removeJobFiles(rjob.jobConf.getUser(), rjob.getJobID().toString());
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1810,10 +1657,20 @@
     synchronized(runningJobs) {
       runningJobs.remove(jobId);
     }
-    
-  }      
-    
-    
+  }
+
+  /**
+   * This job's files are no longer needed on this TT, remove them.
+   * 
+   * @param rjob
+   * @throws IOException
+   */
+  void removeJobFiles(String user, String jobId)
+      throws IOException {
+    directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf,
+        getLocalJobDir(user, jobId)));
+  }
+
   /**
    * Remove the tip and update all relevant state.
    * 
@@ -2148,6 +2005,19 @@
     }
   }
 
+  /** 
+   * Notify the tasktracker to send an out-of-band heartbeat.
+   */
+  private void notifyTTAboutTaskCompletion() {
+    if (oobHeartbeatOnTaskCompletion) {
+      synchronized (finishedCount) {
+        int value = finishedCount.get();
+        finishedCount.set(value+1);
+        finishedCount.notify();
+      }
+    }
+  }
+  
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -2254,26 +2124,25 @@
       FileSystem localFs = FileSystem.getLocal(fConf);
 
       // create taskDirs on all the disks.
-      initializeAttemptDirs(task.getJobID().toString(), task.getTaskID()
-          .toString(), task.isTaskCleanupTask(), localFs, fConf
-          .getStrings("mapred.local.dir"));
+      getLocalizer().initializeAttemptDirs(task.getUser(),
+          task.getJobID().toString(), task.getTaskID().toString(),
+          task.isTaskCleanupTask());
 
       // create the working-directory of the task 
       Path cwd =
-          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getJobID()
-              .toString(), task.getTaskID().toString(), task
+          lDirAlloc.getLocalPathForWrite(getTaskWorkDir(task.getUser(), task
+              .getJobID().toString(), task.getTaskID().toString(), task
               .isTaskCleanupTask()), defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
       }
 
-      localJobConf.set("mapred.local.dir",
-                       fConf.get("mapred.local.dir"));
+      localJobConf.set(LOCAL_DIR,
+                       fConf.get(LOCAL_DIR));
 
-      if (fConf.get("slave.host.name") != null) {
-        localJobConf.set("slave.host.name",
-                         fConf.get("slave.host.name"));
+      if (fConf.get(TT_HOST_NAME) != null) {
+        localJobConf.set(TT_HOST_NAME, fConf.get(TT_HOST_NAME));
       }
             
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
@@ -2292,7 +2161,7 @@
             str.append(',');
           }
         }
-        localJobConf.set("hadoop.net.static.resolutions", str.toString());
+        localJobConf.set(TT_STATIC_RESOLUTIONS, str.toString());
       }
       if (task.isMapTask()) {
         debugCommand = localJobConf.getMapDebugScript();
@@ -2320,14 +2189,18 @@
       return task;
     }
     
-    public TaskRunner getTaskRunner() {
+    TaskRunner getTaskRunner() {
       return runner;
     }
 
+    void setTaskRunner(TaskRunner rnr) {
+      this.runner = rnr;
+    }
+
     public synchronized void setJobConf(JobConf lconf){
       this.localJobConf = lconf;
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
-      taskTimeout = localJobConf.getLong("mapred.task.timeout", 
+      taskTimeout = localJobConf.getLong(JobContext.TASK_TIMEOUT, 
                                          10 * 60 * 1000);
     }
         
@@ -2357,7 +2230,7 @@
         if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
           this.taskStatus.setRunState(TaskStatus.State.RUNNING);
         }
-        this.runner = task.createRunner(TaskTracker.this, this);
+        setTaskRunner(task.createRunner(TaskTracker.this, this));
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
       } else {
@@ -2465,9 +2338,21 @@
       return wasKilled;
     }
 
-    void reportTaskFinished() {
-      taskFinished();
-      releaseSlot();
+    /**
+     * A task is reporting in as 'done'.
+     * 
+     * We need to notify the tasktracker to send an out-of-band heartbeat.
+     * If isn't <code>commitPending</code>, we need to finalize the task
+     * and release the slot it's occupied.
+     * 
+     * @param commitPending is the task-commit pending?
+     */
+    void reportTaskFinished(boolean commitPending) {
+      if (!commitPending) {
+        taskFinished();
+        releaseSlot();
+      }
+      notifyTTAboutTaskCompletion();
     }
 
     /* State changes:
@@ -2535,84 +2420,7 @@
             setTaskFailState(true);
             // call the script here for the failed tasks.
             if (debugCommand != null) {
-              String taskStdout ="";
-              String taskStderr ="";
-              String taskSyslog ="";
-              String jobConf = task.getJobFile();
-              try {
-                // get task's stdout file 
-                taskStdout = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.STDOUT));
-                // get task's stderr file 
-                taskStderr = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.STDERR));
-                // get task's syslog file 
-                taskSyslog = FileUtil.makeShellPath(
-                    TaskLog.getRealTaskLogFileLocation
-                                  (task.getTaskID(), TaskLog.LogName.SYSLOG));
-              } catch(IOException e){
-                LOG.warn("Exception finding task's stdout/err/syslog files");
-              }
-              File workDir = null;
-              try {
-                workDir = new File(lDirAlloc.getLocalPathToRead(
-                                     TaskTracker.getLocalTaskDir( 
-                                       task.getJobID().toString(), 
-                                       task.getTaskID().toString(),
-                                       task.isTaskCleanupTask())
-                                     + Path.SEPARATOR + MRConstants.WORKDIR,
-                                     localJobConf). toString());
-              } catch (IOException e) {
-                LOG.warn("Working Directory of the task " + task.getTaskID() +
-                                " doesnt exist. Caught exception " +
-                          StringUtils.stringifyException(e));
-              }
-              // Build the command  
-              File stdout = TaskLog.getRealTaskLogFileLocation(
-                                   task.getTaskID(), TaskLog.LogName.DEBUGOUT);
-              // add pipes program as argument if it exists.
-              String program ="";
-              String executable = Submitter.getExecutable(localJobConf);
-              if ( executable != null) {
-            	try {
-            	  program = new URI(executable).getFragment();
-            	} catch (URISyntaxException ur) {
-            	  LOG.warn("Problem in the URI fragment for pipes executable");
-            	}	  
-              }
-              String [] debug = debugCommand.split(" ");
-              Vector<String> vargs = new Vector<String>();
-              for (String component : debug) {
-                vargs.add(component);
-              }
-              vargs.add(taskStdout);
-              vargs.add(taskStderr);
-              vargs.add(taskSyslog);
-              vargs.add(jobConf);
-              vargs.add(program);
-              try {
-                List<String>  wrappedCommand = TaskLog.captureDebugOut
-                                                          (vargs, stdout);
-                // run the script.
-                try {
-                  runScript(wrappedCommand, workDir);
-                } catch (IOException ioe) {
-                  LOG.warn("runScript failed with: " + StringUtils.
-                                                      stringifyException(ioe));
-                }
-              } catch(IOException e) {
-                LOG.warn("Error in preparing wrapped debug command");
-              }
-
-              // add all lines of debug out to diagnostics
-              try {
-                int num = localJobConf.getInt("mapred.debug.out.lines", -1);
-                addDiagnostics(FileUtil.makeShellPath(stdout),num,"DEBUG OUT");
-              } catch(IOException ioe) {
-                LOG.warn("Exception in add diagnostics!");
-              }
+              runDebugScript();
             }
           }
           taskStatus.setProgress(0.0f);
@@ -2640,21 +2448,84 @@
 
     }
     
-
-    /**
-     * Runs the script given in args
-     * @param args script name followed by its argumnets
-     * @param dir current working directory.
-     * @throws IOException
-     */
-    public void runScript(List<String> args, File dir) throws IOException {
-      ShellCommandExecutor shexec = 
-              new ShellCommandExecutor(args.toArray(new String[0]), dir);
-      shexec.execute();
-      int exitCode = shexec.getExitCode();
-      if (exitCode != 0) {
-        throw new IOException("Task debug script exit with nonzero status of " 
-                              + exitCode + ".");
+    private void runDebugScript() {
+      String taskStdout ="";
+      String taskStderr ="";
+      String taskSyslog ="";
+      String jobConf = task.getJobFile();
+      try {
+        // get task's stdout file 
+        taskStdout = FileUtil.makeShellPath(
+            TaskLog.getRealTaskLogFileLocation
+                          (task.getTaskID(), TaskLog.LogName.STDOUT));
+        // get task's stderr file 
+        taskStderr = FileUtil.makeShellPath(
+            TaskLog.getRealTaskLogFileLocation
+                          (task.getTaskID(), TaskLog.LogName.STDERR));
+        // get task's syslog file 
+        taskSyslog = FileUtil.makeShellPath(
+            TaskLog.getRealTaskLogFileLocation
+                          (task.getTaskID(), TaskLog.LogName.SYSLOG));
+      } catch(IOException e){
+        LOG.warn("Exception finding task's stdout/err/syslog files");
+      }
+      File workDir = null;
+      try {
+        workDir =
+            new File(lDirAlloc.getLocalPathToRead(
+                TaskTracker.getLocalTaskDir(task.getUser(), task
+                    .getJobID().toString(), task.getTaskID()
+                    .toString(), task.isTaskCleanupTask())
+                    + Path.SEPARATOR + MRConstants.WORKDIR,
+                localJobConf).toString());
+      } catch (IOException e) {
+        LOG.warn("Working Directory of the task " + task.getTaskID() +
+                        " doesnt exist. Caught exception " +
+                  StringUtils.stringifyException(e));
+      }
+      // Build the command  
+      File stdout = TaskLog.getRealTaskLogFileLocation(
+                           task.getTaskID(), TaskLog.LogName.DEBUGOUT);
+      // add pipes program as argument if it exists.
+      String program ="";
+      String executable = Submitter.getExecutable(localJobConf);
+      if ( executable != null) {
+        try {
+          program = new URI(executable).getFragment();
+        } catch (URISyntaxException ur) {
+          LOG.warn("Problem in the URI fragment for pipes executable");
+        }     
+      }
+      String [] debug = debugCommand.split(" ");
+      List<String> vargs = new ArrayList<String>();
+      for (String component : debug) {
+        vargs.add(component);
+      }
+      vargs.add(taskStdout);
+      vargs.add(taskStderr);
+      vargs.add(taskSyslog);
+      vargs.add(jobConf);
+      vargs.add(program);
+      DebugScriptContext context = 
+        new TaskController.DebugScriptContext();
+      context.args = vargs;
+      context.stdout = stdout;
+      context.workDir = workDir;
+      context.task = task;
+      try {
+        getTaskController().runDebugScript(context);
+        // add all lines of debug out to diagnostics
+        try {
+          int num = localJobConf.getInt(JobContext.TASK_DEBUGOUT_LINES,
+              -1);
+          addDiagnostics(FileUtil.makeShellPath(stdout),num,
+              "DEBUG OUT");
+        } catch(IOException ioe) {
+          LOG.warn("Exception in add diagnostics!");
+        }
+      } catch (IOException ie) {
+        LOG.warn("runDebugScript failed with: " + StringUtils.
+                                              stringifyException(ie));
       }
     }
 
@@ -2767,8 +2638,10 @@
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      taskStatus.setFinishTime(System.currentTimeMillis());
       removeFromMemoryManager(task.getTaskID());
       releaseSlot();
+      notifyTTAboutTaskCompletion();
     }
     
     private synchronized void releaseSlot() {
@@ -2835,50 +2708,60 @@
         }
       }
       synchronized (this) {
+        // localJobConf could be null if localization has not happened
+        // then no cleanup will be required.
+        if (localJobConf == null) {
+          return;
+        }
         try {
-          // localJobConf could be null if localization has not happened
-          // then no cleanup will be required.
-          if (localJobConf == null) {
-            return;
-          }
-          String localTaskDir =
-              getLocalTaskDir(task.getJobID().toString(), taskId.toString(),
-                  task.isTaskCleanupTask());
-          String taskWorkDir =
-              getTaskWorkDir(task.getJobID().toString(), taskId.toString(),
-                  task.isTaskCleanupTask());
-          if (needCleanup) {
-            if (runner != null) {
-              //cleans up the output directory of the task (where map outputs 
-              //and reduce inputs get stored)
-              runner.close();
-            }
-
-            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              // No jvm reuse, remove everything
-              directoryCleanupThread.addToQueue(localFs,
-                  getLocalFiles(defaultJobConf,
-                  localTaskDir));
-            }  
-            else {
-              // Jvm reuse. We don't delete the workdir since some other task
-              // (running in the same JVM) might be using the dir. The JVM
-              // running the tasks would clean the workdir per a task in the
-              // task process itself.
-              directoryCleanupThread.addToQueue(localFs, getLocalFiles(
-                  defaultJobConf, localTaskDir + Path.SEPARATOR
-                      + TaskTracker.JOBFILE));
-            }
-          } else {
-            if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
-              directoryCleanupThread.addToQueue(localFs,
-                  getLocalFiles(defaultJobConf,
-                  taskWorkDir));
-            }  
-          }
+          removeTaskFiles(needCleanup, taskId);
         } catch (Throwable ie) {
-          LOG.info("Error cleaning up task runner: " + 
-                   StringUtils.stringifyException(ie));
+          LOG.info("Error cleaning up task runner: "
+              + StringUtils.stringifyException(ie));
+        }
+      }
+    }
+
+    /**
+     * Some or all of the files from this task are no longer required. Remove
+     * them via CleanupQueue.
+     * 
+     * @param needCleanup
+     * @param taskId
+     * @throws IOException 
+     */
+    void removeTaskFiles(boolean needCleanup, TaskAttemptID taskId)
+        throws IOException {
+      if (needCleanup) {
+        if (runner != null) {
+          // cleans up the output directory of the task (where map outputs
+          // and reduce inputs get stored)
+          runner.close();
+        }
+
+        String localTaskDir =
+            getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId
+                .toString(), task.isTaskCleanupTask());
+        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+          // No jvm reuse, remove everything
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+              defaultJobConf, localTaskDir));
+        } else {
+          // Jvm reuse. We don't delete the workdir since some other task
+          // (running in the same JVM) might be using the dir. The JVM
+          // running the tasks would clean the workdir per a task in the
+          // task process itself.
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+              defaultJobConf, localTaskDir + Path.SEPARATOR
+                  + TaskTracker.JOBFILE));
+        }
+      } else {
+        if (localJobConf.getNumTasksToExecutePerJvm() == 1) {
+          String taskWorkDir =
+              getTaskWorkDir(task.getUser(), task.getJobID().toString(),
+                  taskId.toString(), task.isTaskCleanupTask());
+          directoryCleanupThread.addToQueue(localFs, getLocalFiles(
+              defaultJobConf, taskWorkDir));
         }
       }
     }
@@ -3040,6 +2923,17 @@
     purgeTask(tip, true);
   }
 
+  /** 
+   * A child task had a fatal error. Kill the task.
+   */  
+  public synchronized void fatalError(TaskAttemptID taskId, String msg) 
+  throws IOException {
+    LOG.fatal("Task: " + taskId + " - exited : " + msg);
+    TaskInProgress tip = runningTasks.get(taskId);
+    tip.reportDiagnosticInfo("Error: " + msg);
+    purgeTask(tip, true);
+  }
+
   public synchronized MapTaskCompletionEventsUpdate getMapCompletionEvents(
       JobID jobId, int fromEventId, int maxLocs, TaskAttemptID id) 
   throws IOException {
@@ -3076,9 +2970,7 @@
       tip = tasks.get(taskid);
     }
     if (tip != null) {
-      if (!commitPending) {
-        tip.reportTaskFinished();
-      }
+      tip.reportTaskFinished(commitPending);
     } else {
       LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
     }
@@ -3109,6 +3001,7 @@
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
+    JobTokens jobTokens;
     RunningJob(JobID jobid) {
       this.jobid = jobid;
       localized = false;
@@ -3262,7 +3155,7 @@
       JobConf conf=new JobConf();
       // enable the server to track time spent waiting on locks
       ReflectionUtils.setContentionTracing
-        (conf.getBoolean("tasktracker.contention.tracking", false));
+        (conf.getBoolean(TT_CONTENTION_TRACKING, false));
       TaskTracker tracker = new TaskTracker(conf, false);
       Service.startService(tracker);
       tracker.run();
@@ -3284,147 +3177,248 @@
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response
                       ) throws ServletException, IOException {
-      TaskAttemptID reduceAttId = null;
-      String mapId = request.getParameter("map");
+      long start = System.currentTimeMillis();
+      String mapIds = request.getParameter("map");
       String reduceId = request.getParameter("reduce");
       String jobId = request.getParameter("job");
 
+      LOG.debug("Shuffle started for maps (mapIds=" + mapIds + ") to reduce " + 
+               reduceId);
+
       if (jobId == null) {
         throw new IOException("job parameter is required");
       }
 
-      if (mapId == null || reduceId == null) {
+      if (mapIds == null || reduceId == null) {
         throw new IOException("map and reduce parameters are required");
       }
-      try {
-        reduceAttId = TaskAttemptID.forName(reduceId);
-      } catch (IllegalArgumentException e) {
-        throw new IOException("reduce attempt ID is malformed");
-      }
+      
       ServletContext context = getServletContext();
-      int reduce = reduceAttId.getTaskID().getId();
-      byte[] buffer = new byte[MAX_BYTES_TO_READ];
-      // true iff IOException was caused by attempt to access input
-      boolean isInputException = true;
-      OutputStream outStream = null;
-      FSDataInputStream mapOutputIn = null;
+      int reduce = Integer.parseInt(reduceId);
+      DataOutputStream outStream = null;
  
-      long totalRead = 0;
       ShuffleServerMetrics shuffleMetrics =
         (ShuffleServerMetrics) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
 
-      long startTime = 0;
+      verifyRequest(request, response, tracker, jobId);
+      
+      int numMaps = 0;
       try {
         shuffleMetrics.serverHandlerBusy();
-        if(ClientTraceLog.isInfoEnabled())
-          startTime = System.nanoTime();
-        outStream = response.getOutputStream();
+        outStream = new DataOutputStream(response.getOutputStream());
+        //use the same buffersize as used for reading the data from disk
+        response.setBufferSize(MAX_BYTES_TO_READ);
         JobConf conf = (JobConf) context.getAttribute("conf");
         LocalDirAllocator lDirAlloc = 
           (LocalDirAllocator)context.getAttribute("localDirAllocator");
         FileSystem rfs = ((LocalFileSystem)
             context.getAttribute("local.file.system")).getRaw();
 
-        // Index file
-        Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getIntermediateOutputDir(jobId, mapId)
-            + "/file.out.index", conf);
-        
-        // Map-output file
-        Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getIntermediateOutputDir(jobId, mapId)
-            + "/file.out", conf);
+        // Split the map ids, send output for one map at a time
+        StringTokenizer itr = new StringTokenizer(mapIds, ",");
+        while(itr.hasMoreTokens()) {
+          String mapId = itr.nextToken();
+          ++numMaps;
+          sendMapFile(jobId, mapId, reduce, conf, outStream,
+                      tracker, lDirAlloc, shuffleMetrics, rfs);
+        }
+      } catch (IOException ie) {
+        Log log = (Log) context.getAttribute("log");
+        String errorMsg = ("getMapOutputs(" + mapIds + "," + reduceId + 
+                           ") failed");
+        log.warn(errorMsg, ie);
+        response.sendError(HttpServletResponse.SC_GONE, errorMsg);
+        shuffleMetrics.failedOutput();
+        throw ie;
+      } finally {
+        shuffleMetrics.serverHandlerFree();
+      }
+      outStream.close();
+      shuffleMetrics.successOutput();
+      long timeElapsed = (System.currentTimeMillis()-start);
+      LOG.info("Shuffled " + numMaps
+          + "maps (mapIds=" + mapIds + ") to reduce "
+          + reduceId + " in " + timeElapsed + "s");
+
+      if (ClientTraceLog.isInfoEnabled()) {
+        ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
+            request.getLocalAddr() + ":" + request.getLocalPort(),
+            request.getRemoteAddr() + ":" + request.getRemotePort(),
+            numMaps, "MAPRED_SHUFFLE", reduceId,
+            timeElapsed));
+      }
+    }
 
+    private void sendMapFile(String jobId, String mapId,
+                             int reduce,
+                             Configuration conf,
+                             DataOutputStream outStream,
+                             TaskTracker tracker,
+                             LocalDirAllocator lDirAlloc,
+                             ShuffleServerMetrics shuffleMetrics,
+                             FileSystem localfs
+                             ) throws IOException {
+      
+      LOG.debug("sendMapFile called for " + mapId + " to reduce " + reduce);
+      
+      // true iff IOException was caused by attempt to access input
+      boolean isInputException = false;
+      FSDataInputStream mapOutputIn = null;
+      byte[] buffer = new byte[MAX_BYTES_TO_READ];
+      long totalRead = 0;
+
+      String userName = null;
+      synchronized (tracker.runningJobs) {
+        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+        if (rjob == null) {
+          throw new IOException("Unknown job " + jobId + "!!");
+        }
+        userName = rjob.jobConf.getUser();
+      }
+      // Index file
+      Path indexFileName =
+          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+              userName, jobId, mapId)
+              + "/file.out.index", conf);
+
+      // Map-output file
+      Path mapOutputFileName =
+          lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+              userName, jobId, mapId)
+              + "/file.out", conf);
+
+      /**
+       * Read the index file to get the information about where the map-output
+       * for the given reducer is available.
+       */
+      IndexRecord info = 
+        tracker.indexCache.getIndexInformation(mapId, reduce, indexFileName);
+      
+      try {
         /**
-         * Read the index file to get the information about where
-         * the map-output for the given reducer is available. 
-         */
-        IndexRecord info = 
-          tracker.indexCache.getIndexInformation(mapId, reduce,indexFileName);
-          
-        //set the custom "from-map-task" http header to the map task from which
-        //the map output data is being transferred
-        response.setHeader(FROM_MAP_TASK, mapId);
-        
-        //set the custom "Raw-Map-Output-Length" http header to 
-        //the raw (decompressed) length
-        response.setHeader(RAW_MAP_OUTPUT_LENGTH,
-            Long.toString(info.rawLength));
-
-        //set the custom "Map-Output-Length" http header to 
-        //the actual number of bytes being transferred
-        response.setHeader(MAP_OUTPUT_LENGTH,
-            Long.toString(info.partLength));
-
-        //set the custom "for-reduce-task" http header to the reduce task number
-        //for which this map output is being transferred
-        response.setHeader(FOR_REDUCE_TASK, Integer.toString(reduce));
-        
-        //use the same buffersize as used for reading the data from disk
-        response.setBufferSize(MAX_BYTES_TO_READ);
-        
-        /**
-         * Read the data from the sigle map-output file and
+         * Read the data from the single map-output file and
          * send it to the reducer.
          */
         //open the map-output file
-        mapOutputIn = rfs.open(mapOutputFileName);
-
+        mapOutputIn = localfs.open(mapOutputFileName);
         //seek to the correct offset for the reduce
         mapOutputIn.seek(info.startOffset);
+        
+        // write header for each map output
+        ShuffleHeader header = new ShuffleHeader(mapId, info.partLength,
+            info.rawLength, reduce);
+        header.write(outStream);
+
+        // read the map-output and stream it out
+        isInputException = true;
         long rem = info.partLength;
+        if (rem == 0) {
+          throw new IOException("Illegal partLength of 0 for mapId " + mapId + 
+                                " to reduce " + reduce);
+        }
         int len =
           mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
-        while (rem > 0 && len >= 0) {
+        long now = 0;
+        while (len >= 0) {
           rem -= len;
           try {
             shuffleMetrics.outputBytes(len);
-            outStream.write(buffer, 0, len);
-            outStream.flush();
+            
+            if (len > 0) {
+              outStream.write(buffer, 0, len);
+            } else {
+              LOG.info("Skipped zero-length read of map " + mapId + 
+                       " to reduce " + reduce);
+            }
+            
           } catch (IOException ie) {
             isInputException = false;
             throw ie;
           }
           totalRead += len;
+          if (rem == 0) {
+            break;
+          }
           len =
             mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
-
-        LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce + 
-                 " from map: " + mapId + " given " + info.partLength + "/" + 
-                 info.rawLength);
+        
+        mapOutputIn.close();
       } catch (IOException ie) {
-        Log log = (Log) context.getAttribute("log");
-        String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
-                           ") failed :\n"+
-                           StringUtils.stringifyException(ie));
-        log.warn(errorMsg);
+        String errorMsg = "error on sending map " + mapId + " to reduce " + 
+                          reduce;
         if (isInputException) {
-          tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
+          tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg + 
+                                StringUtils.stringifyException(ie));
         }
-        response.sendError(HttpServletResponse.SC_GONE, errorMsg);
-        shuffleMetrics.failedOutput();
-        throw ie;
-      } finally {
-        if (null != mapOutputIn) {
-          mapOutputIn.close();
+        if (mapOutputIn != null) {
+          try {
+            mapOutputIn.close();
+          } catch (IOException ioe) {
+            LOG.info("problem closing map output file", ioe);
+          }
         }
-        final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
-        shuffleMetrics.serverHandlerFree();
-        if (ClientTraceLog.isInfoEnabled()) {
-          ClientTraceLog.info(String.format(MR_CLIENTTRACE_FORMAT,
-                request.getLocalAddr() + ":" + request.getLocalPort(),
-                request.getRemoteAddr() + ":" + request.getRemotePort(),
-                totalRead, "MAPRED_SHUFFLE", mapId, reduceId,
-                endTime-startTime));
+        throw new IOException(errorMsg, ie);
+      }
+      
+      LOG.info("Sent out " + totalRead + " bytes to reduce " + reduce + 
+          " from map: " + mapId + " given " + info.partLength + "/" + 
+          info.rawLength);
+    }
+    
+    /**
+     * verify that request has correct HASH for the url
+     * and also add a field to reply header with hash of the HASH
+     * @param request
+     * @param response
+     * @param jt the job token
+     * @throws IOException
+     */
+    private void verifyRequest(HttpServletRequest request, 
+        HttpServletResponse response, TaskTracker tracker, String jobId) 
+    throws IOException {
+      JobTokens jt = null;
+      synchronized (tracker.runningJobs) {
+        RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId));
+        if (rjob == null) {
+          throw new IOException("Unknown job " + jobId + "!!");
         }
+        jt = rjob.jobTokens;
       }
-      outStream.close();
-      shuffleMetrics.successOutput();
+      // string to encrypt
+      String enc_str = SecureShuffleUtils.buildMsgFrom(request);
+      
+      // hash from the fetcher
+      String urlHashStr = request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+      if(urlHashStr == null) {
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+        throw new IOException("fetcher cannot be authenticated");
+      }
+      int len = urlHashStr.length();
+      LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+
+          urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug
+
+      SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken());
+      // verify - throws exception
+      try {
+        ssutil.verifyReply(urlHashStr, enc_str);
+      } catch (IOException ioe) {
+        response.sendError(HttpServletResponse.SC_UNAUTHORIZED);
+        throw ioe;
+      }
+      
+      // verification passed - encode the reply
+      String reply = ssutil.generateHash(urlHashStr.getBytes());
+      response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      
+      len = reply.length();
+      LOG.debug("Fetcher request verfied. enc_str="+enc_str+";reply="
+          +reply.substring(len-len/2, len-1));
     }
   }
+  
 
   // get the full paths of the directory in all the local disks.
   private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{
@@ -3522,7 +3516,7 @@
     }
 
     Class<? extends MemoryCalculatorPlugin> clazz =
-        fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
+        fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN,
             null, MemoryCalculatorPlugin.class);
     MemoryCalculatorPlugin memoryCalculatorPlugin =
         MemoryCalculatorPlugin
@@ -3546,11 +3540,11 @@
 
     mapSlotMemorySizeOnTT =
         fConf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            MAPMEMORY_MB,
             JobConf.DISABLED_MEMORY_LIMIT);
     reduceSlotSizeMemoryOnTT =
         fConf.getLong(
-            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            REDUCEMEMORY_MB,
             JobConf.DISABLED_MEMORY_LIMIT);
     totalMemoryAllottedForTasks =
         maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots
@@ -3656,66 +3650,99 @@
     return distributedCacheManager;
   }
 
-  /**
-   * Thread that handles cleanup
-   */
-  private class TaskCleanupThread extends Daemon {
-
     /**
-     * flag to halt work
+     * Download the job-token file from the FS and save on local fs.
+     * @param user
+     * @param jobId
+     * @param jobConf
+     * @return the local file system path of the downloaded file.
+     * @throws IOException
      */
-    private volatile boolean live = true;
+    private void localizeJobTokenFile(String user, JobID jobId, JobConf jobConf)
+        throws IOException {
+      // check if the tokenJob file is there..
+      Path skPath = new Path(systemDirectory,
+          jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME);
 
+      FileStatus status = null;
+      long jobTokenSize = -1;
+      status = systemFS.getFileStatus(skPath); //throws FileNotFoundException
+      jobTokenSize = status.getLen();
 
-    /**
-     * Construct a daemon thread.
-     */
-    private TaskCleanupThread() {
-      setName("Task Tracker Task Cleanup Thread");
-    }
+      Path localJobTokenFile =
+          lDirAlloc.getLocalPathForWrite(getLocalJobTokenFile(user,
+              jobId.toString()), jobTokenSize, fConf);
 
-    /**
-     * End the daemon. This is done by setting the live flag to false and
-     * interrupting ourselves.
-     */
-    public void terminate() {
-      live = false;
-      interrupt();
+      LOG.debug("localizingJobTokenFile from sd="+skPath.toUri().getPath() +
+          " to " + localJobTokenFile.toUri().getPath());
+
+      // Download job_token
+      systemFS.copyToLocalFile(skPath, localJobTokenFile);
+      // set it into jobConf to transfer the name to TaskRunner
+      jobConf.set(JobContext.JOB_TOKEN_FILE,localJobTokenFile.toString());
     }
 
+
     /**
-     * process task kill actions until told to stop being live.
+     * Thread that handles cleanup
      */
-    public void run() {
-      LOG.debug("Task cleanup thread started");
-      while (live) {
-        try {
-          TaskTrackerAction action = tasksToCleanup.take();
-          if (action instanceof KillJobAction) {
-            purgeJob((KillJobAction) action);
-          } else if (action instanceof KillTaskAction) {
-            TaskInProgress tip;
-            KillTaskAction killAction = (KillTaskAction) action;
-            synchronized (TaskTracker.this) {
-              tip = tasks.get(killAction.getTaskID());
+    private class TaskCleanupThread extends Daemon {
+
+        /**
+         * flag to halt work
+         */
+        private volatile boolean live = true;
+
+
+        /**
+         * Construct a daemon thread.
+         */
+        private TaskCleanupThread() {
+            setName("Task Tracker Task Cleanup Thread");
+        }
+
+        /**
+         * End the daemon. This is done by setting the live flag to false and
+         * interrupting ourselves.
+         */
+        public void terminate() {
+            live = false;
+            interrupt();
+        }
+
+        /**
+         * process task kill actions until told to stop being live.
+         */
+        public void run() {
+            LOG.debug("Task cleanup thread started");
+            while (live) {
+                try {
+                    TaskTrackerAction action = tasksToCleanup.take();
+                    if (action instanceof KillJobAction) {
+                        purgeJob((KillJobAction) action);
+                    } else if (action instanceof KillTaskAction) {
+                        TaskInProgress tip;
+                        KillTaskAction killAction = (KillTaskAction) action;
+                        synchronized (TaskTracker.this) {
+                            tip = tasks.get(killAction.getTaskID());
+                        }
+                        LOG.info("Received KillTaskAction for task: " +
+                                killAction.getTaskID());
+                        purgeTask(tip, false);
+                    } else {
+                        LOG.error("Non-delete action given to cleanup thread: "
+                                + action);
+                    }
+                } catch (InterruptedException except) {
+                    //interrupted. this may have reset the live flag
+                } catch (Throwable except) {
+                    LOG.warn("Exception in Cleanup thread: " + except,
+                            except);
+                }
             }
-            LOG.info("Received KillTaskAction for task: " +
-                    killAction.getTaskID());
-            purgeTask(tip, false);
-          } else {
-            LOG.error("Non-delete action given to cleanup thread: "
-                    + action);
-          }
-        } catch (InterruptedException except) {
-          //interrupted. this may have reset the live flag                
-        } catch (Throwable except) {
-          LOG.warn("Exception in Cleanup thread: " + except,
-                  except);
+            LOG.debug("Task cleanup thread ending");
         }
-      }
-      LOG.debug("Task cleanup thread ending");
-    }
 
-  }
+    }
 
 }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerInstrumentation.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerMetricsInst.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Sat Nov 28 20:26:01 2009
@@ -27,7 +27,7 @@
  * parent is a daemon which which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */ 
-interface TaskUmbilicalProtocol extends VersionedProtocol {
+public interface TaskUmbilicalProtocol extends VersionedProtocol {
 
   /** 
    * Changed the version to 2, since we have a new method getMapOutputs 
@@ -56,9 +56,10 @@
    * Version 16 Change in signature of getTask() for HADOOP-5488
    * Version 17 Modified TaskID to be aware of the new TaskTypes
    * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+   * Version 19 Added fatalError for child to communicate fatal errors to TT
    * */
 
-  public static final long versionID = 18L;
+  public static final long versionID = 19L;
   
   /**
    * Called when a child task process starts, to get its task.
@@ -132,13 +133,15 @@
   /** Report that the task encounted a local filesystem error.*/
   void fsError(TaskAttemptID taskId, String message) throws IOException;
 
+  /** Report that the task encounted a fatal error.*/
+  void fatalError(TaskAttemptID taskId, String message) throws IOException;
+  
   /** Called by a reduce task to get the map output locations for finished maps.
    * Returns an update centered around the map-task-completion-events. 
    * The update also piggybacks the information whether the events copy at the 
    * task-tracker has changed or not. This will trigger some action at the 
    * child-process.
    *
-   * @param taskId the reduce task id
    * @param fromIndex the index starting from which the locations should be 
    * fetched
    * @param maxLocs the max number of locations to fetch

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextInputFormat.java Sat Nov 28 20:26:01 2009
@@ -42,7 +42,11 @@
   }
   
   protected boolean isSplitable(FileSystem fs, Path file) {
-    return compressionCodecs.getCodec(file) == null;
+    final CompressionCodec codec = compressionCodecs.getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
   }
 
   public RecordReader<LongWritable, Text> getRecordReader(

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -113,7 +113,7 @@
                                                   Progressable progress)
     throws IOException {
     boolean isCompressed = getCompressOutput(job);
-    String keyValueSeparator = job.get("mapred.textoutputformat.separator", 
+    String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", 
                                        "\t");
     if (!isCompressed) {
       Path file = FileOutputFormat.getTaskOutputPath(job, name);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,5 @@
-/** * Licensed to the Apache Software Foundation (ASF) under one
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java Sat Nov 28 20:26:01 2009
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred.lib;
 
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.DefaultStringifier;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -30,19 +32,45 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 
 /**
  * The Chain class provides all the common functionality for the
  * {@link ChainMapper} and the {@link ChainReducer} classes.
- * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead
  */
-@Deprecated
-class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain {
+class Chain {
+  private static final String CHAIN_MAPPER = "chain.mapper";
+  private static final String CHAIN_REDUCER = "chain.reducer";
+
+  private static final String CHAIN_MAPPER_SIZE = ".size";
+  private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+  private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+  private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+  private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
 
   private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
   private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
 
+  private static final String MAPPER_INPUT_KEY_CLASS =
+    "chain.mapper.input.key.class";
+  private static final String MAPPER_INPUT_VALUE_CLASS =
+    "chain.mapper.input.value.class";
+  private static final String MAPPER_OUTPUT_KEY_CLASS =
+    "chain.mapper.output.key.class";
+  private static final String MAPPER_OUTPUT_VALUE_CLASS =
+    "chain.mapper.output.value.class";
+  private static final String REDUCER_INPUT_KEY_CLASS =
+    "chain.reducer.input.key.class";
+  private static final String REDUCER_INPUT_VALUE_CLASS =
+    "chain.reducer.input.value.class";
+  private static final String REDUCER_OUTPUT_KEY_CLASS =
+    "chain.reducer.output.key.class";
+  private static final String REDUCER_OUTPUT_VALUE_CLASS =
+    "chain.reducer.output.value.class";
+
+  private boolean isMap;
+
   private JobConf chainJobConf;
 
   private List<Mapper> mappers = new ArrayList<Mapper>();
@@ -64,7 +92,51 @@
    *              Reducer.
    */
   Chain(boolean isMap) {
-    super(isMap);
+    this.isMap = isMap;
+  }
+
+  /**
+   * Returns the prefix to use for the configuration of the chain depending
+   * if it is for a Mapper or a Reducer.
+   *
+   * @param isMap TRUE for Mapper, FALSE for Reducer.
+   * @return the prefix to use.
+   */
+  private static String getPrefix(boolean isMap) {
+    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+  }
+
+  /**
+   * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
+   * <p/>
+   * It creates a new JobConf using the chain job's JobConf as base and adds to
+   * it the configuration properties for the chain element. The keys of the
+   * chain element jobConf have precedence over the given JobConf.
+   *
+   * @param jobConf the chain job's JobConf.
+   * @param confKey the key for chain element configuration serialized in the
+   *                chain job's JobConf.
+   * @return a new JobConf aggregating the chain job's JobConf with the chain
+   *         element configuration properties.
+   */
+  private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
+    JobConf conf;
+    try {
+      Stringifier<JobConf> stringifier =
+        new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+      conf = stringifier.fromString(jobConf.get(confKey, null));
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex);
+    }
+    // we have to do this because the Writable desearialization clears all
+    // values set in the conf making not possible do do a new JobConf(jobConf)
+    // in the creation of the conf above
+    jobConf = new JobConf(jobConf);
+
+    for(Map.Entry<String, String> entry : conf) {
+      jobConf.set(entry.getKey(), entry.getValue());
+    }
+    return jobConf;
   }
 
   /**
@@ -97,27 +169,82 @@
     String prefix = getPrefix(isMap);
 
     // if a reducer chain check the Reducer has been already set
-    checkReducerAlreadySet(isMap, jobConf, prefix, true);
-	    
-    // set the mapper class
-    int index = getIndex(jobConf, prefix);
+    if (!isMap) {
+      if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
+                           Reducer.class) == null) {
+        throw new IllegalStateException(
+          "A Mapper can be added to the chain only after the Reducer has " +
+          "been set");
+      }
+    }
+    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
     jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
-	    
-    validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass,
-      outputKeyClass, outputValueClass, index, prefix);
-	    
+
+    // if it is a reducer chain and the first Mapper is being added check the
+    // key and value input classes of the mapper match those of the reducer
+    // output.
+    if (!isMap && index == 0) {
+      JobConf reducerConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      if (! inputKeyClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output key class does" +
+          " not match the Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output value class" +
+          " does not match the Mapper input value class");
+      }
+    } else if (index > 0) {
+      // check the that the new Mapper in the chain key and value input classes
+      // match those of the previous Mapper output.
+      JobConf previousMapperConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
+          (index - 1));
+      if (! inputKeyClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output key class does" +
+          " not match the previous Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output value class" +
+          " does not match the previous Mapper input value class");
+      }
+    }
+
     // if the Mapper does not have a private JobConf create an empty one
     if (mapperConf == null) {
-    // using a JobConf without defaults to make it lightweight.
-    // still the chain JobConf may have all defaults and this conf is
-    // overlapped to the chain JobConf one.
+      // using a JobConf without defaults to make it lightweight.
+      // still the chain JobConf may have all defaults and this conf is
+      // overlapped to the chain JobConf one.
       mapperConf = new JobConf(true);
     }
-    // store in the private mapper conf if it works by value or by reference
+
+    // store in the private mapper conf the input/output classes of the mapper
+    // and if it works by value or by reference
     mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
-    
-    setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass,
-	      outputKeyClass, outputValueClass, mapperConf, index, prefix);
+    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
+                        Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+                        Object.class);
+
+    // serialize the private mapper jobconf in the chain jobconf.
+    Stringifier<JobConf> stringifier =
+      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+    try {
+      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
+                  stringifier.toString(new JobConf(mapperConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+
+    // increment the chain counter
+    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
   }
 
   /**
@@ -146,10 +273,13 @@
                           Class<? extends V2> outputValueClass,
                           boolean byValue, JobConf reducerConf) {
     String prefix = getPrefix(false);
-    checkReducerAlreadySet(false, jobConf, prefix, false);
+
+    if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+      throw new IllegalStateException("Reducer has been already set");
+    }
 
     jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
-    
+
     // if the Reducer does not have a private JobConf create an empty one
     if (reducerConf == null) {
       // using a JobConf without defaults to make it lightweight.
@@ -161,9 +291,24 @@
     // store in the private reducer conf the input/output classes of the reducer
     // and if it works by value or by reference
     reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
-
-    setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass,
-      outputValueClass, reducerConf, prefix);
+    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+                         Object.class);
+
+    // serialize the private mapper jobconf in the chain jobconf.
+    Stringifier<JobConf> stringifier =
+      new DefaultStringifier<JobConf>(jobConf, JobConf.class);
+    try {
+      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
+                  stringifier.toString(new JobConf(reducerConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
   }
 
   /**
@@ -180,8 +325,8 @@
     for (int i = 0; i < index; i++) {
       Class<? extends Mapper> klass =
         jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
-      JobConf mConf = new JobConf(
-        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i));
+      JobConf mConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
       Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
       mappers.add(mapper);
 
@@ -198,8 +343,8 @@
     Class<? extends Reducer> klass =
       jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
     if (klass != null) {
-      JobConf rConf = new JobConf(
-        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG));
+      JobConf rConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
       reducer = ReflectionUtils.newInstance(klass, rConf);
       if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
         reducerKeySerialization = serializationFactory

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Sat Nov 28 20:26:01 2009
@@ -86,10 +86,7 @@
  * RunningJob job = jc.submitJob(conf);
  * ...
  * </pre>
- * @deprecated 
- * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead
  */
-@Deprecated
 public class ChainMapper implements Mapper {
 
   /**