You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/05 18:39:28 UTC

svn commit: r741203 [1/2] - in /hadoop/core/branches/branch-0.20: ./ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/lib/output/ src/test/org/apache/hadoop/mapred/ src/webapps/job/

Author: ddas
Date: Thu Feb  5 17:39:27 2009
New Revision: 741203

URL: http://svn.apache.org/viewvc?rev=741203&view=rev
Log:
HADOOP-4759. Removes temporary output directory for failed and killed tasks by launching special CLEANUP tasks for the same. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Feb  5 17:39:27 2009
@@ -594,6 +594,10 @@
     HADOOP-5085. Copying a file to local with Crc throws an exception.
     (hairong)
 
+    HADOOP-4759. Removes temporary output directory for failed and
+    killed tasks by launching special CLEANUP tasks for the same. 
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java Thu Feb  5 17:39:27 2009
@@ -47,6 +47,7 @@
     LogFactory.getLog(TaskTracker.class);
 
   static volatile TaskAttemptID taskid;
+  static volatile boolean isCleanup;
 
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
@@ -75,7 +76,7 @@
           try {
             Thread.sleep(5000);
             if (taskid != null) {
-              TaskLog.syncLogs(firstTaskid, taskid);
+              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
             }
           } catch (InterruptedException ie) {
           } catch (IOException iee) {
@@ -95,6 +96,7 @@
     Path srcPidPath = null;
     Path dstPidPath = null;
     int idleLoopCount = 0;
+    Task task = null;
     try {
       while (true) {
         JvmTask myTask = umbilical.getTask(jvmId);
@@ -114,22 +116,21 @@
           }
         }
         idleLoopCount = 0;
-        Task task = myTask.getTask();
+        task = myTask.getTask();
         taskid = task.getTaskID();
-        
+        isCleanup = task.isTaskCleanupTask();
         //create the index file so that the log files 
         //are viewable immediately
-        TaskLog.syncLogs(firstTaskid, taskid);
+        TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
         if (job.getBoolean("task.memory.mgmt.enabled", false)) {
           if (srcPidPath == null) {
-            srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
-                                                              job);
+            srcPidPath = new Path(task.getPidFile());
           }
           //since the JVM is running multiple tasks potentially, we need
           //to do symlink stuff only for the subsequent tasks
           if (!taskid.equals(firstTaskid)) {
-            dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+            dstPidPath = new Path(task.getPidFile());
             FileUtil.symLink(srcPidPath.toUri().getPath(), 
                 dstPidPath.toUri().getPath());
           }
@@ -154,9 +155,10 @@
         try {
           task.run(job, umbilical);             // run the task
         } finally {
-          TaskLog.syncLogs(firstTaskid, taskid);
+          TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
           if (!taskid.equals(firstTaskid) && 
               job.getBoolean("task.memory.mgmt.enabled", false)) {
+            // delete the pid-file's symlink
             new File(dstPidPath.toUri().getPath()).delete();
           }
         }
@@ -169,6 +171,10 @@
       umbilical.fsError(taskid, e.getMessage());
     } catch (Throwable throwable) {
       LOG.warn("Error running child", throwable);
+      if (task != null) {
+        // do cleanup for the task
+        task.taskCleanup(umbilical);
+      }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       throwable.printStackTrace(new PrintStream(baos));

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Thu Feb  5 17:39:27 2009
@@ -132,9 +132,11 @@
   public void abortTask(TaskAttemptContext context) {
     Path taskOutputPath =  getTempTaskOutputPath(context);
     try {
-      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-      context.getProgressible().progress();
-      fs.delete(taskOutputPath, true);
+      if (taskOutputPath != null) {
+        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+        context.getProgressible().progress();
+        fs.delete(taskOutputPath, true);
+      }
     } catch (IOException ie) {
       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
     }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Feb  5 17:39:27 2009
@@ -59,8 +59,9 @@
    *             in heartbeat method (HADOOP-4305) 
    * Version 23: Added parameter 'initialContact' again in heartbeat method
    *            (HADOOP-4869) 
+   * Version 24: Changed format of Task and TaskStatus for HADOOP-4759 
    */
-  public static final long versionID = 23L;
+  public static final long versionID = 24L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Thu Feb  5 17:39:27 2009
@@ -180,9 +180,9 @@
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
     File workDirName = new File(lDirAlloc.getLocalPathToRead(
-                                  TaskTracker.getJobCacheSubdir() 
-                                  + Path.SEPARATOR + taskId.getJobID() 
-                                  + Path.SEPARATOR + taskId
+                                  TaskTracker.getLocalTaskDir(
+                                    taskId.getJobID().toString(), 
+                                    taskId.toString())
                                   + Path.SEPARATOR + "work",
                                   conf). toString());
     local.setWorkingDirectory(new Path(workDirName.toString()));

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java Thu Feb  5 17:39:27 2009
@@ -1272,7 +1272,8 @@
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
-                                      String.valueOf(httpPort)}); 
+                                      httpPort == -1 ? "" : 
+                                        String.valueOf(httpPort)}); 
         }
       }
     }
@@ -1468,7 +1469,8 @@
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
-                                      String.valueOf(httpPort)}); 
+                                      httpPort == -1 ? "" : 
+                                        String.valueOf(httpPort)}); 
         }
       }
     }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Feb  5 17:39:27 2009
@@ -44,6 +44,7 @@
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.StringUtils;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -113,6 +114,12 @@
 
   // A set of running reduce TIPs
   Set<TaskInProgress> runningReduces;
+  
+  // A list of cleanup tasks for the map task attempts, to be launched
+  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
+  
+  // A list of cleanup tasks for the reduce task attempts, to be launched
+  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
   private final int maxLevel;
 
@@ -473,12 +480,12 @@
     // Just assign splits[0]
     cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
             jobtracker, conf, this, numMapTasks);
-    cleanup[0].setCleanupTask();
+    cleanup[0].setJobCleanupTask();
 
     // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks, jobtracker, conf, this);
-    cleanup[1].setCleanupTask();
+    cleanup[1].setJobCleanupTask();
 
     // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
@@ -486,12 +493,12 @@
     // Just assign splits[0]
     setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
             jobtracker, conf, this, numMapTasks + 1 );
-    setup[0].setSetupTask();
+    setup[0].setJobSetupTask();
 
     // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks + 1, jobtracker, conf, this);
-    setup[1].setSetupTask();
+    setup[1].setJobSetupTask();
     
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
@@ -743,11 +750,27 @@
     if (wasComplete && (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
       status.setRunState(TaskStatus.State.KILLED);
     }
+    
+    // If the job is complete and a task has just reported its 
+    // state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
+    // make the task's state FAILED/KILLED without launching cleanup attempt.
+    // Note that if task is already a cleanup attempt, 
+    // we don't change the state to make sure the task gets a killTaskAction
+    if ((this.isComplete() || jobFailed || jobKilled) && 
+        !tip.isCleanupAttempt(taskid)) {
+      if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.FAILED);
+      } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.KILLED);
+      }
+    }
+    
     boolean change = tip.updateStatus(status);
     if (change) {
       TaskStatus.State state = status.getRunState();
+      // get the TaskTrackerStatus where the task ran 
       TaskTrackerStatus ttStatus = 
-        this.jobtracker.getTaskTracker(status.getTaskTracker());
+        this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
       String httpTaskLogLocation = null; 
 
       if (null != ttStatus){
@@ -768,8 +791,8 @@
                                             taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap() &&
-                                            !tip.isCleanupTask() &&
-                                            !tip.isSetupTask(),
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
@@ -783,6 +806,15 @@
           tip.doCommit(taskid);
         }
         return;
+      } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
+                 state == TaskStatus.State.KILLED_UNCLEAN) {
+        tip.incompleteSubTask(taskid, this.status);
+        // add this task, to be rescheduled as cleanup attempt
+        if (tip.isMapTask()) {
+          mapCleanupTasks.add(taskid);
+        } else {
+          reduceCleanupTasks.add(taskid);
+        }
       }
       //For a failed task update the JT datastructures. 
       else if (state == TaskStatus.State.FAILED ||
@@ -813,8 +845,8 @@
                                             taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap() &&
-                                            !tip.isCleanupTask() &&
-                                            !tip.isSetupTask(),
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
                                             taskCompletionStatus, 
                                             httpTaskLogLocation
                                            );
@@ -843,7 +875,7 @@
                  oldProgress + " to " + tip.getProgress());
     }
     
-    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
         if (maps.length == 0) {
@@ -941,6 +973,40 @@
     return result;
   }    
 
+  /*
+   * Return task cleanup attempt if any, to run on a given tracker
+   */
+  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+                                                 boolean isMapSlot)
+  throws IOException {
+    if (this.status.getRunState() != JobStatus.RUNNING || 
+        jobFailed || jobKilled) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    TaskAttemptID taskid = null;
+    TaskInProgress tip = null;
+    if (isMapSlot) {
+      if (!mapCleanupTasks.isEmpty()) {
+        taskid = mapCleanupTasks.remove(0);
+        tip = maps[taskid.getTaskID().getId()];
+      }
+    } else {
+      if (!reduceCleanupTasks.isEmpty()) {
+        taskid = reduceCleanupTasks.remove(0);
+        tip = reduces[taskid.getTaskID().getId()];
+      }
+    }
+    if (tip != null) {
+      return tip.addRunningTask(taskid, taskTracker, true);
+    }
+    return null;
+  }
+  
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
                                                      int clusterSize, 
                                                      int numUniqueHosts)
@@ -991,7 +1057,7 @@
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public Task obtainCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainJobCleanupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
@@ -1001,7 +1067,7 @@
     }
     
     synchronized(this) {
-      if (!canLaunchCleanupTask()) {
+      if (!canLaunchJobCleanupTask()) {
         return null;
       }
       
@@ -1042,7 +1108,7 @@
    * or all maps and reduces are complete
    * @return true/false
    */
-  private synchronized boolean canLaunchCleanupTask() {
+  private synchronized boolean canLaunchJobCleanupTask() {
     if (!tasksInited.get()) {
       return false;
     }
@@ -1073,7 +1139,7 @@
    * Return a SetupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public Task obtainSetupTask(TaskTrackerStatus tts, 
+  public Task obtainJobSetupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
@@ -1197,10 +1263,10 @@
     String name;
     String splits = "";
     Enum counter = null;
-    if (tip.isSetupTask()) {
+    if (tip.isJobSetupTask()) {
       launchedSetup = true;
       name = Values.SETUP.name();
-    } else if (tip.isCleanupTask()) {
+    } else if (tip.isJobCleanupTask()) {
       launchedCleanup = true;
       name = Values.CLEANUP.name();
     } else if (tip.isMapTask()) {
@@ -1223,7 +1289,7 @@
       JobHistory.Task.logStarted(tip.getTIPId(), name,
                                  tip.getExecStartTime(), splits);
     }
-    if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+    if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       jobCounters.incrCounter(counter, 1);
     }
     
@@ -1244,7 +1310,7 @@
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
-    if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
+    if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
       int level = this.maxLevel;
@@ -1924,8 +1990,8 @@
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
-    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isSetupTask() ? Values.SETUP.name() :
+    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isJobSetupTask() ? Values.SETUP.name() :
                       tip.isMapTask() ? Values.MAP.name() : 
                       Values.REDUCE.name();
     if (status.getIsMap()){
@@ -1955,14 +2021,14 @@
                                 status.getCounters()); 
         
     int newNumAttempts = tip.getActiveTasks().size();
-    if (tip.isSetupTask()) {
+    if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
       // Job can start running now.
       this.status.setSetupProgress(1.0f);
       this.status.setRunState(JobStatus.RUNNING);
       JobHistory.JobInfo.logStarted(profile.getJobID());
-    } else if (tip.isCleanupTask()) {
+    } else if (tip.isJobCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
         // kill the reduce tip
@@ -2097,6 +2163,8 @@
         }
         jobKilled = true;
       }
+      // clear all unclean tasks
+      clearUncleanTasks();
       //
       // kill all TIPs.
       //
@@ -2111,7 +2179,22 @@
       }
     }
   }
-  
+
+  private void clearUncleanTasks() {
+    TaskAttemptID taskid = null;
+    TaskInProgress tip = null;
+    while (!mapCleanupTasks.isEmpty()) {
+      taskid = mapCleanupTasks.remove(0);
+      tip = maps[taskid.getTaskID().getId()];
+      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+    }
+    while (!reduceCleanupTasks.isEmpty()) {
+      taskid = reduceCleanupTasks.remove(0);
+      tip = reduces[taskid.getTaskID().getId()];
+      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+    }
+  }
+
   /**
    * Kill the job and all its component tasks. This method is called from 
    * jobtracker and should return fast as it locks the jobtracker.
@@ -2162,16 +2245,16 @@
     boolean wasFailed = tip.isFailed();
 
     // Mark the taskid as FAILED or KILLED
-    tip.incompleteSubTask(taskid, taskTrackerStatus, this.status);
+    tip.incompleteSubTask(taskid, this.status);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
-      if (tip.isCleanupTask()) {
+      if (tip.isJobCleanupTask()) {
         launchedCleanup = false;
-      } else if (tip.isSetupTask()) {
+      } else if (tip.isJobSetupTask()) {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
@@ -2208,43 +2291,48 @@
     }
         
     // update job history
-    String taskTrackerName = taskTrackerStatus.getHost();
-    long finishTime = status.getFinishTime();
-    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isSetupTask() ? Values.SETUP.name() :
+    // get taskStatus from tip
+    TaskStatus taskStatus = tip.getTaskStatus(taskid);
+    String taskTrackerName = taskStatus.getTaskTracker();
+    String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
+    int taskTrackerPort = -1;
+    if (taskTrackerStatus != null) {
+      taskTrackerPort = taskTrackerStatus.getHttpPort();
+    }
+    long startTime = taskStatus.getStartTime();
+    long finishTime = taskStatus.getFinishTime();
+    List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
+    String diagInfo = taskDiagnosticInfo == null ? "" :
+      StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
+    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isJobSetupTask() ? Values.SETUP.name() :
                       tip.isMapTask() ? Values.MAP.name() : 
                       Values.REDUCE.name();
-    if (status.getIsMap()) {
-      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          taskType);
-      if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+    if (taskStatus.getIsMap()) {
+      JobHistory.MapAttempt.logStarted(taskid, startTime, 
+        taskTrackerName, taskTrackerPort, taskType);
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.MapAttempt.logFailed(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       } else {
-        JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(),
-                taskType);
+        JobHistory.MapAttempt.logKilled(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       }
     } else {
-      JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          taskType);
-      if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+      JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
+        taskTrackerName, taskTrackerPort, taskType);
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       } else {
-        JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+        JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       }
     }
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       if (tip.isMapTask()) {
         failedMapTasks++;
       } else {
@@ -2256,7 +2344,7 @@
     // Note down that a task has failed on this tasktracker 
     //
     if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(taskTrackerStatus.getTrackerName());
+      addTrackerTaskFailure(taskTrackerName);
     }
         
     //
@@ -2274,7 +2362,7 @@
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
+      boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
                         tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -2283,9 +2371,9 @@
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
                                   taskType,  
-                                  status.getFinishTime(), 
-                                  status.getDiagnosticInfo());
-        if (tip.isCleanupTask()) {
+                                  finishTime, 
+                                  diagInfo);
+        if (tip.isJobCleanupTask()) {
           // kill the other tip
           if (tip.isMapTask()) {
             cleanup[1].kill();
@@ -2294,7 +2382,7 @@
           }
           terminateJob(JobStatus.FAILED);
         } else {
-          if (tip.isSetupTask()) {
+          if (tip.isJobSetupTask()) {
             // kill the other tip
             killSetupTip(!tip.isMapTask());
           }
@@ -2305,7 +2393,7 @@
       //
       // Update the counters
       //
-      if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+      if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
         if (tip.isMapTask()) {
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
         } else {
@@ -2344,8 +2432,8 @@
     status.setFinishTime(System.currentTimeMillis());
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
-                              tip.isCleanupTask() ? Values.CLEANUP.name() : 
-                              tip.isSetupTask() ? Values.SETUP.name() : 
+                              tip.isJobCleanupTask() ? Values.CLEANUP.name() : 
+                              tip.isJobSetupTask() ? Values.SETUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.getExecFinishTime(), reason, taskid); 
   }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Feb  5 17:39:27 2009
@@ -1654,7 +1654,10 @@
   // and TaskInProgress
   ///////////////////////////////////////////////////////
   void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
-    LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
+    LOG.info("Adding task " + 
+      (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") + 
+      "'"  + taskid + "' to tip " + 
+      tip.getTIPId() + ", for tracker '" + taskTracker + "'");
 
     // taskid --> tracker
     taskidToTrackerMap.put(taskid, taskTracker);
@@ -1736,6 +1739,8 @@
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+            taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
@@ -1746,6 +1751,8 @@
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+            taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
@@ -2559,7 +2566,7 @@
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, true);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2568,7 +2575,15 @@
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+          t = job.obtainTaskCleanupTask(taskTracker, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
                                   numUniqueHosts, true);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2579,7 +2594,7 @@
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, false);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2588,7 +2603,15 @@
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+          t = job.obtainTaskCleanupTask(taskTracker, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, false);
           if (t != null) {
             return Collections.singletonList(t);
@@ -3137,7 +3160,7 @@
         // And completed maps with zero reducers of the job 
         // never need to be failed. 
         if (!tip.isComplete() || 
-            (tip.isMapTask() && !tip.isSetupTask() && 
+            (tip.isMapTask() && !tip.isJobSetupTask() && 
              job.desiredReduces() != 0)) {
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING ||
@@ -3146,7 +3169,10 @@
                            (tip.isMapTask() ? 
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
-                           TaskStatus.State.KILLED, trackerName, myInstrumentation);
+                            tip.isRunningTask(taskId) ? 
+                              TaskStatus.State.KILLED_UNCLEAN : 
+                              TaskStatus.State.KILLED,
+                            trackerName, myInstrumentation);
             jobsWithFailures.add(job);
           }
         } else {

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java Thu Feb  5 17:39:27 2009
@@ -43,8 +43,6 @@
 
   JvmManagerForType reduceJvmManager;
   
-  TaskTracker tracker;
-
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
       Map<String,String> env, String pidFile, JobConf conf) {
@@ -53,10 +51,9 @@
   
   public JvmManager(TaskTracker tracker) {
     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
-        true, tracker);
+        true);
     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
-        false, tracker);
-    this.tracker = tracker;
+        false);
   }
   
   public void stop() {
@@ -74,9 +71,9 @@
 
   public void launchJvm(TaskRunner t, JvmEnv env) {
     if (t.getTask().isMapTask()) {
-      mapJvmManager.reapJvm(t, tracker, env);
+      mapJvmManager.reapJvm(t, env);
     } else {
-      reduceJvmManager.reapJvm(t, tracker, env);
+      reduceJvmManager.reapJvm(t, env);
     }
   }
 
@@ -125,12 +122,10 @@
     boolean isMap;
     
     Random rand = new Random(System.currentTimeMillis());
-    TaskTracker tracker;
 
-    public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+    public JvmManagerForType(int maxJvms, boolean isMap) {
       this.maxJvms = maxJvms;
       this.isMap = isMap;
-      this.tracker = tracker;
     }
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -194,7 +189,7 @@
       jvmIdToRunner.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        TaskRunner t, TaskTracker tracker, JvmEnv env) {
+        TaskRunner t, JvmEnv env) {
       if (t.getTaskInProgress().wasKilled()) {
         //the task was killed in-flight
         //no need to do the rest of the operations
@@ -251,7 +246,7 @@
           LOG.info("Killing JVM: " + runnerToKill.jvmId);
           runnerToKill.kill();
         }
-        spawnNewJvm(jobId, env, tracker, t);
+        spawnNewJvm(jobId, env, t);
         return;
       }
       //*MUST* never reach this
@@ -281,7 +276,7 @@
       return details.toString();
     }
 
-    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker, 
+    private void spawnNewJvm(JobID jobId, JvmEnv env,  
         TaskRunner t) {
       JvmRunner jvmRunner = new JvmRunner(env,jobId);
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
@@ -293,11 +288,6 @@
       //tasks. Doing it this way also keeps code simple.
       jvmRunner.setDaemon(true);
       jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
-      if (tracker.isTaskMemoryManagerEnabled()) {
-        tracker.getTaskMemoryManager().addTask(
-            TaskAttemptID.forName(env.conf.get("mapred.task.id")),
-            tracker.getVirtualMemoryForTask(env.conf));
-      }
       setRunningTaskForJvm(jvmRunner.jvmId, t);
       LOG.info(jvmRunner.getName());
       jvmRunner.start();
@@ -355,6 +345,7 @@
           LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
               numTasksRan);
           try {
+            // In case of jvm-reuse,
             //the task jvm cleans up the common workdir for every 
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
@@ -362,12 +353,6 @@
               FileUtil.fullyDelete(env.workDir);
             }
           } catch (IOException ie){}
-          if (tracker.isTaskMemoryManagerEnabled()) {
-          // Remove the associated pid-file, if any
-            tracker.getTaskMemoryManager().
-               removePidFile(TaskAttemptID.forName(
-                   env.conf.get("mapred.task.id")));
-          }
         }
       }
 

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java Thu Feb  5 17:39:27 2009
@@ -30,13 +30,13 @@
 class MapOutputFile {
 
   private JobConf conf;
-  private String jobDir;
+  private JobID jobId;
   
   MapOutputFile() {
   }
 
   MapOutputFile(JobID jobId) {
-    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+    this.jobId = jobId;
   }
 
   private LocalDirAllocator lDirAlloc = 
@@ -47,9 +47,9 @@
    */
   public Path getOutputFile(TaskAttemptID mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/file.out", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out", conf);
   }
 
   /** Create a local map output file name.
@@ -58,9 +58,9 @@
    */
   public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/file.out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out", size, conf);
   }
 
   /** Return the path to a local map output index file created earlier
@@ -68,9 +68,9 @@
    */
   public Path getOutputIndexFile(TaskAttemptID mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/file.out.index", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out.index", conf);
   }
 
   /** Create a local map output index file name.
@@ -79,10 +79,10 @@
    */
   public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/file.out.index", 
-                                          size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out.index", 
+                       size, conf);
   }
 
   /** Return a local map spill file created earlier.
@@ -91,10 +91,10 @@
    */
   public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/spill" 
-                                        + spillNumber + ".out", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" 
+                       + spillNumber + ".out", conf);
   }
 
   /** Create a local map spill file name.
@@ -104,10 +104,10 @@
    */
   public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, 
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/spill" + 
-                                          spillNumber + ".out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + 
+                       spillNumber + ".out", size, conf);
   }
 
   /** Return a local map spill index file created earlier
@@ -116,10 +116,10 @@
    */
   public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/spill" + 
-                                        spillNumber + ".out.index", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + 
+                       spillNumber + ".out.index", conf);
   }
 
   /** Create a local map spill index file name.
@@ -129,10 +129,10 @@
    */
   public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/spill" + spillNumber + 
-                                          ".out.index", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + spillNumber + 
+                       ".out.index", size, conf);
   }
 
   /** Return a local reduce input file created earlier
@@ -142,10 +142,10 @@
   public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        reduceTaskId + Path.SEPARATOR + 
-                                        "output" + "/map_" + mapId + ".out",
-                                        conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), reduceTaskId.toString())
+                       + "/map_" + mapId + ".out",
+                       conf);
   }
 
   /** Create a local reduce input file name.
@@ -157,17 +157,17 @@
                                    long size)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          reduceTaskId + Path.SEPARATOR +
-                                          ("output" + "/map_" + mapId.getId() + 
-                                           ".out"), 
-                                          size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), reduceTaskId.toString())
+                       + "/map_" + mapId.getId() + ".out", 
+                       size, conf);
   }
 
   /** Removes all of the files related to a task. */
   public void removeAll(TaskAttemptID taskId) throws IOException {
-    conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
-                          taskId + Path.SEPARATOR + "output");
+    conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
+                          jobId.toString(), taskId.toString())
+);
   }
 
   public void setConf(Configuration conf) {
@@ -179,7 +179,7 @@
   }
   
   public void setJobId(JobID jobId) {
-    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+    this.jobId = jobId;
   }
 
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu Feb  5 17:39:27 2009
@@ -103,13 +103,15 @@
   @Override
   public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
-    Path localSplit = new Path(new Path(getJobFile()).getParent(), 
-                               "split.dta");
-    LOG.debug("Writing local split to " + localSplit);
-    DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    out.close();
+    if (isMapOrReduce()) {
+      Path localSplit = new Path(new Path(getJobFile()).getParent(), 
+                                 "split.dta");
+      LOG.debug("Writing local split to " + localSplit);
+      DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
+      Text.writeString(out, splitClass);
+      split.write(out);
+      out.close();
+    }
   }
   
   @Override
@@ -121,16 +123,20 @@
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    split = null;
+    if (isMapOrReduce()) {
+      Text.writeString(out, splitClass);
+      split.write(out);
+      split = null;
+    }
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    splitClass = Text.readString(in);
-    split.readFields(in);
+    if (isMapOrReduce()) {
+      splitClass = Text.readString(in);
+      split.readFields(in);
+    }
   }
 
   /**
@@ -280,12 +286,16 @@
     initialize(job, getJobID(), reporter, useNewApi);
 
     // check if it is a cleanupJobTask
-    if (cleanupJob) {
-      runCleanup(umbilical, reporter);
+    if (jobCleanup) {
+      runJobCleanupTask(umbilical, reporter);
+      return;
+    }
+    if (jobSetup) {
+      runJobSetupTask(umbilical, reporter);
       return;
     }
-    if (setupJob) {
-      runSetupJob(umbilical, reporter);
+    if (taskCleanup) {
+      runTaskCleanupTask(umbilical, reporter);
       return;
     }
 

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Feb  5 17:39:27 2009
@@ -343,7 +343,7 @@
     throws IOException, InterruptedException, ClassNotFoundException {
     job.setBoolean("mapred.skip.on", isSkipping());
 
-    if (!cleanupJob && !setupJob) {
+    if (isMapOrReduce()) {
       copyPhase = getProgress().addPhase("copy");
       sortPhase  = getProgress().addPhase("sort");
       reducePhase = getProgress().addPhase("reduce");
@@ -355,12 +355,16 @@
     initialize(job, getJobID(), reporter, useNewApi);
 
     // check if it is a cleanupJobTask
-    if (cleanupJob) {
-      runCleanup(umbilical, reporter);
+    if (jobCleanup) {
+      runJobCleanupTask(umbilical, reporter);
       return;
     }
-    if (setupJob) {
-      runSetupJob(umbilical, reporter);
+    if (jobSetup) {
+      runJobSetupTask(umbilical, reporter);
+      return;
+    }
+    if (taskCleanup) {
+      runTaskCleanupTask(umbilical, reporter);
       return;
     }
     
@@ -383,6 +387,7 @@
     }
     copyPhase.complete();                         // copy is already complete
     setPhase(TaskStatus.Phase.SORT);
+    statusUpdate(umbilical);
 
     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     RawKeyValueIterator rIter = isLocal
@@ -398,6 +403,7 @@
     
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
+    statusUpdate(umbilical);
     Class keyClass = job.getMapOutputKeyClass();
     Class valueClass = job.getMapOutputValueClass();
     RawComparator comparator = job.getOutputValueGroupingComparator();
@@ -1243,10 +1249,10 @@
         // else, we will check the localFS to find a suitable final location
         // for this path
         TaskAttemptID reduceId = reduceTask.getTaskID();
-        Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
-                                 Path.SEPARATOR + getTaskID().getJobID() +
-                                 Path.SEPARATOR + reduceId +
-                                 Path.SEPARATOR + "output" + "/map_" +
+        Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
+                                 reduceId.getJobID().toString(),
+                                 reduceId.toString()) 
+                                 + "/map_" +
                                  loc.getTaskId().getId() + ".out");
         
         // Copy the map output to a temp file whose name is unique to this attempt 

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Thu Feb  5 17:39:27 2009
@@ -110,8 +110,9 @@
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
-  protected boolean cleanupJob = false;
-  protected boolean setupJob = false;
+  protected boolean jobCleanup = false;
+  protected boolean jobSetup = false;
+  protected boolean taskCleanup = false;
   
   //skip ranges based on failed ranges from previous attempts
   private SortedRanges skipRanges = new SortedRanges();
@@ -131,8 +132,8 @@
   protected TaskAttemptContext taskContext;
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
-  private volatile boolean commitPending = false;
   protected final Counters.Counter spilledRecordsCounter;
+  private String pidFile = "";
 
   ////////////////////////////////////////////
   // Constructors
@@ -168,6 +169,12 @@
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
   Counters getCounters() { return counters; }
+  public void setPidFile(String pidFile) { 
+    this.pidFile = pidFile; 
+  }
+  public String getPidFile() { 
+    return pidFile; 
+  }
   
   /**
    * Get the job name for this task.
@@ -244,15 +251,50 @@
   }
 
   /**
-   * Sets whether the task is cleanup task
+   * Return current state of the task. 
+   * needs to be synchronized as communication thread 
+   * sends the state every second
+   * @return
+   */
+  synchronized TaskStatus.State getState(){
+    return this.taskStatus.getRunState(); 
+  }
+  /**
+   * Set current state of the task. 
+   * @param state
    */
-  public void setCleanupTask() {
-    cleanupJob = true;
+  synchronized void setState(TaskStatus.State state){
+    this.taskStatus.setRunState(state); 
   }
 
-  public void setSetupTask() {
-    setupJob = true; 
+  void setTaskCleanupTask() {
+    taskCleanup = true;
   }
+	   
+  boolean isTaskCleanupTask() {
+    return taskCleanup;
+  }
+
+  boolean isJobCleanupTask() {
+    return jobCleanup;
+  }
+
+  boolean isJobSetupTask() {
+    return jobSetup;
+  }
+
+  void setJobSetupTask() {
+    jobSetup = true; 
+  }
+
+  void setJobCleanupTask() {
+    jobCleanup = true; 
+  }
+
+  boolean isMapOrReduce() {
+    return !jobSetup && !jobCleanup && !taskCleanup;
+  }
+  
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -264,10 +306,13 @@
     taskStatus.write(out);
     skipRanges.write(out);
     out.writeBoolean(skipping);
-    out.writeBoolean(cleanupJob);
-    out.writeBoolean(setupJob);
+    out.writeBoolean(jobCleanup);
+    out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
+    out.writeBoolean(taskCleanup);  
+    Text.writeString(out, pidFile);
   }
+  
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
@@ -278,9 +323,14 @@
     currentRecIndexIterator = skipRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
-    cleanupJob = in.readBoolean();
-    setupJob = in.readBoolean();
+    jobCleanup = in.readBoolean();
+    jobSetup = in.readBoolean();
     writeSkipRecs = in.readBoolean();
+    taskCleanup = in.readBoolean();
+    if (taskCleanup) {
+      setPhase(TaskStatus.Phase.CLEANUP);
+    }
+    pidFile = Text.readString(in);
   }
 
   @Override
@@ -332,6 +382,9 @@
                                                    InterruptedException {
     jobContext = new JobContext(job, id, reporter);
     taskContext = new TaskAttemptContext(job, taskId, reporter);
+    if (getState() == TaskStatus.State.UNASSIGNED) {
+      setState(TaskStatus.State.RUNNING);
+    }
     if (useNewApi) {
       LOG.debug("using new api for output committer");
       outputFormat =
@@ -461,17 +514,10 @@
           if (sendProgress) {
             // we need to send progress update
             updateCounters();
-            if (commitPending) {
-              taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                      taskProgress.get(),
-                                      taskProgress.toString(), 
-                                      counters);
-            } else {
-              taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                      taskProgress.get(),
-                                      taskProgress.toString(), 
-                                      counters);
-            }
+            taskStatus.statusUpdate(getState(),
+                                    taskProgress.get(),
+                                    taskProgress.toString(), 
+                                    counters);
             taskFound = umbilical.statusUpdate(taskId, taskStatus);
             taskStatus.clearStatus();
           }
@@ -604,8 +650,7 @@
     boolean commitRequired = committer.needsTaskCommit(taskContext);
     if (commitRequired) {
       int retries = MAX_RETRIES;
-      taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
-      commitPending = true;
+      setState(TaskStatus.State.COMMIT_PENDING);
       // say the task tracker that task is commit pending
       while (true) {
         try {
@@ -631,37 +676,21 @@
     sendDone(umbilical);
   }
 
-  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  protected void statusUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
-    //first wait for the COMMIT approval from the tasktracker
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        // send a final status report
-        if (commitPending) {
-          taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                  taskProgress.get(),
-                                  taskProgress.toString(), 
-                                  counters);
-        } else {
-          taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                  taskProgress.get(),
-                                  taskProgress.toString(), 
-                                  counters);
-        }
-
-        try {
-          if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-            LOG.warn("Parent died.  Exiting "+taskId);
-            System.exit(66);
-          }
-          taskStatus.clearStatus();
-          return;
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt(); // interrupt ourself
+        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+          LOG.warn("Parent died.  Exiting "+taskId);
+          System.exit(66);
         }
+        taskStatus.clearStatus();
+        return;
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt(); // interrupt ourself
       } catch (IOException ie) {
-        LOG.warn("Failure sending last status update: " + 
+        LOG.warn("Failure sending status update: " + 
                   StringUtils.stringifyException(ie));
         if (--retries == 0) {
           throw ie;
@@ -669,6 +698,16 @@
       }
     }
   }
+  
+  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // send a final status report
+    taskStatus.statusUpdate(getState(),
+                            taskProgress.get(),
+                            taskProgress.toString(), 
+                            counters);
+    statusUpdate(umbilical);
+  }
 
   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = MAX_RETRIES;
@@ -735,18 +774,37 @@
     }
   }
 
-  protected void runCleanup(TaskUmbilicalProtocol umbilical,
-                            TaskReporter reporter
-                            ) throws IOException, InterruptedException {
+  protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
+                                TaskReporter reporter) 
+  throws IOException, InterruptedException {
+    taskCleanup(umbilical);
+    done(umbilical, reporter);
+  }
+
+  void taskCleanup(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // set phase for this task
+    setPhase(TaskStatus.Phase.CLEANUP);
+    getProgress().setStatus("cleanup");
+    statusUpdate(umbilical);
+    LOG.info("Runnning cleanup for the task");
+    // do the cleanup
+    discardOutput(taskContext);
+  }
+
+  protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
+                               TaskReporter reporter
+                              ) throws IOException, InterruptedException {
     // set phase for this task
     setPhase(TaskStatus.Phase.CLEANUP);
     getProgress().setStatus("cleanup");
+    statusUpdate(umbilical);
     // do the cleanup
     committer.cleanupJob(jobContext);
     done(umbilical, reporter);
   }
 
-  protected void runSetupJob(TaskUmbilicalProtocol umbilical,
+  protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
                              TaskReporter reporter
                              ) throws IOException, InterruptedException {
     // do the setup

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Thu Feb  5 17:39:27 2009
@@ -82,8 +82,8 @@
   private long maxSkipRecords = 0;
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
-  private boolean cleanup = false; 
-  private boolean setup = false;
+  private boolean jobCleanup = false; 
+  private boolean jobSetup = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -103,8 +103,10 @@
   private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
-  // Map from taskId -> Task
-  private Map<TaskAttemptID, Task> tasks = new TreeMap<TaskAttemptID, Task>();
+  // Map from taskId -> TaskTracker Id, 
+  // contains cleanup attempts and where they ran, if any
+  private TreeMap<TaskAttemptID, String> cleanupTasks =
+    new TreeMap<TaskAttemptID, String>();
 
   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
   private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
@@ -174,20 +176,20 @@
     return partition;
   }    
 
-  public boolean isCleanupTask() {
-   return cleanup;
+  public boolean isJobCleanupTask() {
+   return jobCleanup;
   }
   
-  public void setCleanupTask() {
-    cleanup = true;
+  public void setJobCleanupTask() {
+    jobCleanup = true;
   }
 
-  public boolean isSetupTask() {
-    return setup;
+  public boolean isJobSetupTask() {
+    return jobSetup;
   }
 	  
-  public void setSetupTask() {
-    setup = true;
+  public void setJobSetupTask() {
+    jobSetup = true;
   }
 
   public boolean isOnlyCommitPending() {
@@ -275,15 +277,6 @@
   }
     
   /**
-   * Return the Task object associated with a taskId
-   * @param taskId
-   * @return
-   */  
-  public Task getTask(TaskAttemptID taskId) {
-    return tasks.get(taskId);
-  }
-
-  /**
    * Is the Task associated with taskid is the first attempt of the tip? 
    * @param taskId
    * @return Returns true if the Task is the first attempt of the tip
@@ -392,7 +385,8 @@
       tasksReportedClosed.add(taskid);
       close = true;
     } else if (isComplete() && 
-               !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
+               !(isMapTask() && !jobSetup && 
+                   !jobCleanup && isComplete(taskid)) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
@@ -516,6 +510,8 @@
       // @see {@link TaskTracker.transmitHeartbeat()}
       if ((newState != TaskStatus.State.RUNNING && 
            newState != TaskStatus.State.COMMIT_PENDING && 
+           newState != TaskStatus.State.FAILED_UNCLEAN && 
+           newState != TaskStatus.State.KILLED_UNCLEAN && 
            newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
@@ -531,6 +527,8 @@
           newState == TaskStatus.State.UNASSIGNED) &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
+           oldState == TaskStatus.State.FAILED_UNCLEAN || 
+           oldState == TaskStatus.State.KILLED_UNCLEAN || 
            oldState == TaskStatus.State.SUCCEEDED ||
            oldState == TaskStatus.State.COMMIT_PENDING)) {
         return false;
@@ -538,8 +536,17 @@
           
       changed = oldState != newState;
     }
-        
-    taskStatuses.put(taskid, status);
+    // if task is a cleanup attempt, do not replace the complete status,
+    // update only specific fields.
+    // For example, startTime should not be updated, 
+    // but finishTime has to be updated.
+    if (!isCleanupAttempt(taskid)) {
+      taskStatuses.put(taskid, status);
+    } else {
+      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+        status.getProgress(), status.getStateString(), status.getPhase(),
+        status.getFinishTime());
+    }
 
     // Recompute progress
     recomputeProgress();
@@ -551,29 +558,38 @@
    * has failed.
    */
   public void incompleteSubTask(TaskAttemptID taskid, 
-                                TaskTrackerStatus ttStatus,
                                 JobStatus jobStatus) {
     //
     // Note the failure and its location
     //
-    String trackerName = ttStatus.getTrackerName();
-    String trackerHostName = ttStatus.getHost();
-     
     TaskStatus status = taskStatuses.get(taskid);
+    String trackerName;
+    String trackerHostName = null;
     TaskStatus.State taskState = TaskStatus.State.FAILED;
     if (status != null) {
+      trackerName = status.getTaskTracker();
+      trackerHostName = 
+        JobInProgress.convertTrackerNameToHostName(trackerName);
       // Check if the user manually KILLED/FAILED this task-attempt...
       Boolean shouldFail = tasksToKill.remove(taskid);
       if (shouldFail != null) {
-        taskState = (shouldFail) ? TaskStatus.State.FAILED :
-                                   TaskStatus.State.KILLED;
+        if (isCleanupAttempt(taskid)) {
+          taskState = (shouldFail) ? TaskStatus.State.FAILED :
+                                     TaskStatus.State.KILLED;
+        } else {
+          taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
+                                     TaskStatus.State.KILLED_UNCLEAN;
+          
+        }
         status.setRunState(taskState);
         addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
       }
  
       taskState = status.getRunState();
       if (taskState != TaskStatus.State.FAILED && 
-              taskState != TaskStatus.State.KILLED) {
+          taskState != TaskStatus.State.KILLED &&
+          taskState != TaskStatus.State.FAILED_UNCLEAN &&
+          taskState != TaskStatus.State.KILLED_UNCLEAN) {
         LOG.info("Task '" + taskid + "' running on '" + trackerName + 
                 "' in state: '" + taskState + "' being failed!");
         status.setRunState(TaskStatus.State.FAILED);
@@ -594,7 +610,7 @@
     // should note this failure only for completed maps, only if this taskid;
     // completed this map. however if the job is done, there is no need to 
     // manipulate completed maps
-    if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) && 
+    if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
       
@@ -614,7 +630,7 @@
           skipping = startSkipping();
         }
 
-      } else {
+      } else if (taskState == TaskStatus.State.KILLED) {
         numKilledTasks++;
       }
     }
@@ -741,6 +757,7 @@
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
         || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+        st.inTaskCleanupPhase() ||
         st.getRunState() == TaskStatus.State.UNASSIGNED)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
@@ -865,11 +882,17 @@
     return addRunningTask(taskid, taskTracker);
   }
   
+  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+    return addRunningTask(taskid, taskTracker, false);
+  }
+  
   /**
    * Adds a previously running task to this tip. This is used in case of 
    * jobtracker restarts.
    */
-  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+  public Task addRunningTask(TaskAttemptID taskid, 
+                             String taskTracker,
+                             boolean taskCleanup) {
     // create the task
     Task t = null;
     if (isMapTask()) {
@@ -880,11 +903,17 @@
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
-    if (cleanup) {
-      t.setCleanupTask();
+    if (jobCleanup) {
+      t.setJobCleanupTask();
+    }
+    if (jobSetup) {
+      t.setJobSetupTask();
     }
-    if (setup) {
-      t.setSetupTask();
+    if (taskCleanup) {
+      t.setTaskCleanupTask();
+      t.setState(taskStatuses.get(taskid).getRunState());
+      cleanupTasks.put(taskid, taskTracker);
+      jobtracker.removeTaskEntry(taskid);
     }
     t.setConf(conf);
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
@@ -893,7 +922,6 @@
     if(failedRanges.isTestAttempt()) {
       t.setWriteSkipRecs(false);
     }
-    tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
 
@@ -901,6 +929,23 @@
     jobtracker.createTaskEntry(taskid, taskTracker, this);
     return t;
   }
+
+  boolean isRunningTask(TaskAttemptID taskid) {
+    TaskStatus status = taskStatuses.get(taskid);
+    return status != null && status.getRunState() == TaskStatus.State.RUNNING;
+  }
+  
+  boolean isCleanupAttempt(TaskAttemptID taskid) {
+    return cleanupTasks.containsKey(taskid);
+  }
+  
+  String machineWhereCleanupRan(TaskAttemptID taskid) {
+    return cleanupTasks.get(taskid);
+  }
+  
+  String machineWhereTaskRan(TaskAttemptID taskid) {
+    return taskStatuses.get(taskid).getTaskTracker();
+  }
     
   /**
    * Has this task already failed on this machine?
@@ -987,7 +1032,7 @@
   }
 
   public long getMapInputSize() {
-    if(isMapTask() && !setup && !cleanup) {
+    if(isMapTask() && !jobSetup && !jobCleanup) {
       return rawSplit.getDataLength();
     } else {
       return 0;

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java Thu Feb  5 17:39:27 2009
@@ -80,7 +80,14 @@
   
   private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
       LogName filter) throws IOException {
-    File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+    return getLogFileDetail(taskid, filter, false);
+  }
+  
+  private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, 
+                                                LogName filter,
+                                                boolean isCleanup) 
+  throws IOException {
+    File indexFile = getIndexFile(taskid.toString(), isCleanup);
     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -120,8 +127,17 @@
   }
   
   public static File getIndexFile(String taskid) {
-    return new File(getBaseDir(taskid), "log.index");
+    return getIndexFile(taskid, false);
+  }
+  
+  public static File getIndexFile(String taskid, boolean isCleanup) {
+    if (isCleanup) {
+      return new File(getBaseDir(taskid), "log.index.cleanup");
+    } else {
+      return new File(getBaseDir(taskid), "log.index");
+    }
   }
+  
   private static File getBaseDir(String taskid) {
     return new File(LOG_DIR, taskid);
   }
@@ -129,9 +145,10 @@
   private static long prevErrLength;
   private static long prevLogLength;
   
-  private static void writeToIndexFile(TaskAttemptID firstTaskid) 
+  private static void writeToIndexFile(TaskAttemptID firstTaskid,
+                                       boolean isCleanup) 
   throws IOException {
-    File indexFile = getIndexFile(currentTaskid.toString());
+    File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     BufferedOutputStream bos = 
       new BufferedOutputStream(new FileOutputStream(indexFile,false));
     DataOutputStream dos = new DataOutputStream(bos);
@@ -159,8 +176,17 @@
     prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
   }
   private volatile static TaskAttemptID currentTaskid = null;
+
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+                                           TaskAttemptID taskid) 
+  throws IOException {
+    syncLogs(firstTaskid, taskid, false);
+  }
+  
   @SuppressWarnings("unchecked")
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid) 
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+                                           TaskAttemptID taskid,
+                                           boolean isCleanup) 
   throws IOException {
     System.out.flush();
     System.err.flush();
@@ -179,10 +205,9 @@
       currentTaskid = taskid;
       resetPrevLengths(firstTaskid);
     }
-    writeToIndexFile(firstTaskid);
+    writeToIndexFile(firstTaskid, isCleanup);
   }
   
-  
   /**
    * The filter for userlogs.
    */
@@ -249,6 +274,12 @@
   static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
+
+    public Reader(TaskAttemptID taskid, LogName kind, 
+                  long start, long end) throws IOException {
+      this(taskid, kind, start, end, false);
+    }
+    
     /**
      * Read a log file from start to end positions. The offsets may be negative,
      * in which case they are relative to the end of the file. For example,
@@ -258,12 +289,13 @@
      * @param kind the kind of log to read
      * @param start the offset to read from (negative is relative to tail)
      * @param end the offset to read upto (negative is relative to tail)
+     * @param isCleanup whether the attempt is cleanup attempt or not
      * @throws IOException
      */
     public Reader(TaskAttemptID taskid, LogName kind, 
-                  long start, long end) throws IOException {
+                  long start, long end, boolean isCleanup) throws IOException {
       // find the right log file
-      LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
+      LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
       // calculate the start and stop
       long size = fileDetail.length;
       if (start < 0) {

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java Thu Feb  5 17:39:27 2009
@@ -104,7 +104,8 @@
   private void printTaskLog(HttpServletResponse response,
                             OutputStream out, TaskAttemptID taskId, 
                             long start, long end, boolean plainText, 
-                            TaskLog.LogName filter) throws IOException {
+                            TaskLog.LogName filter, boolean isCleanup) 
+  throws IOException {
     if (!plainText) {
       out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" +
                  "<pre>\n").getBytes());
@@ -112,7 +113,7 @@
 
     try {
       InputStream taskLogReader = 
-        new TaskLog.Reader(taskId, filter, start, end);
+        new TaskLog.Reader(taskId, filter, start, end, isCleanup);
       byte[] b = new byte[65536];
       int result;
       while (true) {
@@ -159,6 +160,7 @@
     long end = -1;
     boolean plainText = false;
     TaskLog.LogName filter = null;
+    boolean isCleanup = false;
 
     String taskIdStr = request.getParameter("taskid");
     if (taskIdStr == null) {
@@ -193,7 +195,12 @@
     if (sPlainText != null) {
       plainText = Boolean.valueOf(sPlainText);
     }
-
+    
+    String sCleanup = request.getParameter("cleanup");
+    if (sCleanup != null) {
+      isCleanup = Boolean.valueOf(sCleanup);
+    }
+    
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
@@ -203,21 +210,22 @@
 
       if (filter == null) {
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.STDOUT);
+                     TaskLog.LogName.STDOUT, isCleanup);
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.STDERR);
+                     TaskLog.LogName.STDERR, isCleanup);
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.SYSLOG);
+                     TaskLog.LogName.SYSLOG, isCleanup);
         if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
           printTaskLog(response, out, taskId, start, end, plainText, 
-                       TaskLog.LogName.DEBUGOUT);
+                       TaskLog.LogName.DEBUGOUT, isCleanup);
         }
         if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
           printTaskLog(response, out, taskId, start, end, plainText, 
-                       TaskLog.LogName.PROFILE);
+                       TaskLog.LogName.PROFILE, isCleanup);
         }
       } else {
-        printTaskLog(response, out, taskId, start, end, plainText, filter);
+        printTaskLog(response, out, taskId, start, end, plainText, filter,
+                     isCleanup);
       }
       
       out.write("</body></html>\n".getBytes());
@@ -226,7 +234,8 @@
       response.sendError(HttpServletResponse.SC_BAD_REQUEST,
           "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
     } else {
-      printTaskLog(response, out, taskId, start, end, plainText, filter);
+      printTaskLog(response, out, taskId, start, end, plainText, filter, 
+                   isCleanup);
     } 
   }
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Thu Feb  5 17:39:27 2009
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
+import java.io.File;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -27,9 +27,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
@@ -71,11 +68,11 @@
         ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit) {
+  public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
       ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
-          sleepTimeBeforeSigKill);
+          sleepTimeBeforeSigKill, pidFile);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -91,9 +88,11 @@
     private String pid;
     private ProcfsBasedProcessTree pTree;
     private long memLimit;
+    private String pidFile;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
+        ProcfsBasedProcessTree pTree, long memLimit, 
+        long sleepTimeBeforeSigKill, String pidFile) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
@@ -101,6 +100,7 @@
         this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
       }
       this.memLimit = memLimit;
+      this.pidFile = pidFile;
     }
 
     public TaskAttemptID getTID() {
@@ -171,7 +171,8 @@
 
         // Initialize any uninitialized processTrees
         if (pId == null) {
-          pId = getPid(tid); // get pid from pid-file
+          // get pid from pid-file
+          pId = getPid(ptInfo.pidFile); 
           if (pId != null) {
             // PID will be null, either if the pid file is yet to be created
             // or if the tip is finished and we removed pidFile, but the TIP
@@ -309,47 +310,13 @@
   /**
    * Load pid of the task from the pidFile.
    * 
-   * @param tipID
+   * @param pidFileName
    * @return the pid of the task process.
    */
-  private String getPid(TaskAttemptID tipID) {
-    Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
-    if (pidFileName == null) {
-      return null;
-    }
-    return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
-  }
-
-  private static LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator("mapred.local.dir");
-
-  /**
-   * Get the pidFile path of a Task
-   * @param tipID
-   * @return pidFile's Path
-   */
-  public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
-    Path pidFileName = null;
-    try {
-      //this actually need not use a localdirAllocator since the PID
-      //files are really small..
-      pidFileName = lDirAlloc.getLocalPathToRead(
-          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
-          conf);
-    } catch (IOException i) {
-      // PID file is not there
-      LOG.debug("Failed to get pidFile name for " + tipID);
-    }
-    return pidFileName;
-  }
-  public void removePidFile(TaskAttemptID tid) {
-    if (taskTracker.isTaskMemoryManagerEnabled()) {
-      Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
-      if (pidFilePath != null) {
-        try {
-          FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
-        } catch(IOException ie) {}
-      }
-    }
+  private String getPid(String pidFileName) {
+    if ((new File(pidFileName)).exists()) {
+      return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName);
+     }
+     return null;
   }
 }

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Thu Feb  5 17:39:27 2009
@@ -113,9 +113,10 @@
                           new Path(conf.getJar()).getParent().toString());
       }
       File workDir = new File(lDirAlloc.getLocalPathToRead(
-                                TaskTracker.getJobCacheSubdir() 
-                                + Path.SEPARATOR + t.getJobID() 
-                                + Path.SEPARATOR + t.getTaskID()
+                                TaskTracker.getLocalTaskDir( 
+                                  t.getJobID().toString(), 
+                                  t.getTaskID().toString(),
+                                  t.isTaskCleanupTask())
                                 + Path.SEPARATOR + MRConstants.WORKDIR,
                                 conf). toString());
 
@@ -374,12 +375,12 @@
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
-      String pidFile = null;
-      if (tracker.isTaskMemoryManagerEnabled()) {
-        pidFile = lDirAlloc.getLocalPathForWrite(
-            (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
+      String pidFile = lDirAlloc.getLocalPathForWrite(
+            (TaskTracker.getPidFile(t.getJobID().toString(), 
+             taskid.toString(), t.isTaskCleanupTask())),
             this.conf).toString();
-      }
+      t.setPidFile(pidFile);
+      tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -458,7 +459,7 @@
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tracker.reportTaskFinished(t.getTaskID(), false);
+      tip.reportTaskFinished();
     }
   }