You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/05 18:24:12 UTC

svn commit: r741192 [2/2] - in /hadoop/core/trunk: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/lib/output/ src/test/org/apache/hadoop/mapred/ src/webapps/job/

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu Feb  5 17:24:11 2009
@@ -41,7 +41,7 @@
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING}
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
     
   private final TaskAttemptID taskid;
   private float progress;
@@ -204,6 +204,12 @@
     }
     this.phase = phase; 
   }
+
+  boolean inTaskCleanupPhase() {
+    return (this.phase == TaskStatus.Phase.CLEANUP && 
+      (this.runState == TaskStatus.State.FAILED_UNCLEAN || 
+      this.runState == TaskStatus.State.KILLED_UNCLEAN));
+  }
   
   public boolean getIncludeCounters() {
     return includeCounters; 
@@ -261,9 +267,9 @@
   /**
    * Update the status of the task.
    * 
+   * @param runstate
    * @param progress
    * @param state
-   * @param phase
    * @param counters
    */
   synchronized void statusUpdate(State runState, 
@@ -300,7 +306,33 @@
     this.counters = status.getCounters();
     this.outputSize = status.outputSize;
   }
-  
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in JobTracker when a cleanup attempt of task
+   * reports its status. Then update only specific fields, not all.
+   * 
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(State runState, 
+                                 float progress,
+                                 String state, 
+                                 Phase phase,
+                                 long finishTime) {
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime; 
+    }
+  }
+
   /**
    * Clear out transient information after sending out a status-update
    * from either the {@link Task} to the {@link TaskTracker} or from the

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb  5 17:24:11 2009
@@ -183,7 +183,8 @@
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
-  private static final String PIDDIR = "pids";
+  private static final String PID = "pid";
+  private static final String OUTPUT = "output";
   private JobConf originalConf;
   private JobConf fConf;
   private int maxCurrentMapTasks;
@@ -412,38 +413,63 @@
   static String getJobCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
+  
+  static String getLocalJobDir(String jobid) {
+	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
+  }
 
-  static String getPidFilesSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+  static String getLocalTaskDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid, false) ; 
   }
- 
+
+  static String getIntermediateOutputDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid) 
+           + Path.SEPARATOR + TaskTracker.OUTPUT ; 
+  }
+
+  static String getLocalTaskDir(String jobid, 
+                                String taskid, 
+                                boolean isCleanupAttempt) {
+	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+	if (isCleanupAttempt) { 
+      taskDir = taskDir + ".cleanup";
+	}
+	return taskDir;
+  }
+
+  static String getPidFile(String jobid, 
+                           String taskid, 
+                           boolean isCleanup) {
+    return  getLocalTaskDir(jobid, taskid, isCleanup)
+            + Path.SEPARATOR + PID;
+  }
+
   /**
    * Get the pidFile path of a Task
+   * 
    * @param tid the TaskAttemptID of the task for which pidFile's path is needed
+   * @param conf Configuration for local dir allocator
+   * @param isCleanup true if the task is cleanup attempt
+   *  
    * @return pidFile's Path
    */
-  public static Path getPidFilePath(TaskAttemptID tid, JobConf conf) {
+  static Path getPidFilePath(TaskAttemptID tid, 
+                             JobConf conf, 
+                             boolean isCleanup) {
     Path pidFileName = null;
     try {
       //this actually need not use a localdirAllocator since the PID
       //files are really small..
       pidFileName = lDirAlloc.getLocalPathToRead(
-          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tid),
-          conf);
+        getPidFile(tid.getJobID().toString(), tid.toString(), isCleanup),
+        conf);
     } catch (IOException i) {
       // PID file is not there
-      LOG.warn("Failed to get pidFile name for " + tid + " " + i);
+      LOG.warn("Failed to get pidFile name for " + tid + " " + 
+                StringUtils.stringifyException(i));
     }
     return pidFileName;
   }
-  public void removePidFile(TaskAttemptID tid) {
-    Path pidFilePath = getPidFilePath(tid, getJobConf());
-    if (pidFilePath != null) {
-      try {
-        FileSystem.getLocal(getJobConf()).delete(pidFilePath, false);
-      } catch(IOException ie) {}
-    }
-  }
   
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
@@ -785,9 +811,9 @@
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
     }
-    Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                                    + Path.SEPARATOR + jobId 
-                                    + Path.SEPARATOR + "job.xml"),
+    Path localJobFile = lDirAlloc.getLocalPathForWrite(
+                                    getLocalJobDir(jobId.toString())
+                                    + Path.SEPARATOR + "job.xml",
                                     jobFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, tip);
     synchronized (rjob) {
@@ -811,9 +837,9 @@
         
         // create the 'work' directory
         // job-specific shared directory for use as scratch space 
-        Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                       + Path.SEPARATOR + jobId 
-                       + Path.SEPARATOR + "work"), fConf);
+        Path workDir = lDirAlloc.getLocalPathForWrite(
+                         (getLocalJobDir(jobId.toString())
+                         + Path.SEPARATOR + "work"), fConf);
         if (!localFs.mkdirs(workDir)) {
           throw new IOException("Mkdirs failed to create " 
                       + workDir.toString());
@@ -835,8 +861,7 @@
           // Here we check for and we check five times the size of jarFileSize
           // to accommodate for unjarring the jar file in work directory 
           localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
-                                     getJobCacheSubdir()
-                                     + Path.SEPARATOR + jobId 
+                                     getLocalJobDir(jobId.toString())
                                      + Path.SEPARATOR + "jars",
                                      5 * jarFileSize, fConf), "job.jar");
           if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1253,7 +1278,8 @@
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            !taskStatus.inTaskCleanupPhase()) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
           } else {
@@ -1370,7 +1396,8 @@
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
-          tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          tip.isCleaningup()) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
         long jobTaskTimeout = tip.getTaskTimeout();
@@ -1424,8 +1451,7 @@
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
           directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
-                                   SUBDIR + Path.SEPARATOR + JOBCACHE + 
-                                   Path.SEPARATOR +  rjob.getJobID()));
+            getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1684,7 +1710,9 @@
           }
           synchronized (tip) {
             //to make sure that there is no kill task action for this
-            if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+            if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+                tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+                tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
               //got killed externally while still in the launcher queue
               addFreeSlot();
               continue;
@@ -1705,7 +1733,8 @@
   private TaskInProgress registerTask(LaunchTaskAction action, 
       TaskLauncher launcher) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+             " task's state:" + t.getState());
     TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
@@ -1727,10 +1756,6 @@
   private void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
-      if (isTaskMemoryManagerEnabled()) {
-        taskMemoryManager.addTask(tip.getTask().getTaskID(), 
-            getVirtualMemoryForTask(tip.getJobConf()));
-      }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -1751,7 +1776,23 @@
       }
     }
   }
-    
+  
+  void addToMemoryManager(TaskAttemptID attemptId, 
+                          JobConf conf, 
+                          String pidFile) {
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.addTask(attemptId, 
+        getVirtualMemoryForTask(conf), pidFile);
+    }
+  }
+
+  void removeFromMemoryManager(TaskAttemptID attemptId) {
+    // Remove the entry from taskMemoryManagerThread's data structures.
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.removeTask(attemptId);
+    }
+  }
+
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -1838,10 +1879,12 @@
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
-                                               TaskStatus.State.UNASSIGNED, 
+                                               task.getState(),
                                                diagnosticInfo.toString(), 
                                                "initializing",  
                                                getName(), 
+                                               task.isTaskCleanupTask() ? 
+                                                 TaskStatus.Phase.CLEANUP :  
                                                task.isMapTask()? TaskStatus.Phase.MAP:
                                                TaskStatus.Phase.SHUFFLE,
                                                task.getCounters()); 
@@ -1851,9 +1894,10 @@
     private void localizeTask(Task task) throws IOException{
 
       Path localTaskDir = 
-        lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
-                    Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
-                    task.getTaskID()), defaultJobConf );
+        lDirAlloc.getLocalPathForWrite(
+          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
+            task.getTaskID().toString(), task.isTaskCleanupTask()), 
+          defaultJobConf );
       
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
@@ -1863,8 +1907,7 @@
 
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
+                         TaskTracker.getLocalJobDir(task.getJobID().toString())
                          + Path.SEPARATOR  
                          + "work", defaultJobConf).toString();
       String link = localTaskDir.getParent().toString() 
@@ -1875,11 +1918,10 @@
       
       // create the working-directory of the task 
       Path cwd = lDirAlloc.getLocalPathForWrite(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
-                         + Path.SEPARATOR + task.getTaskID()
-                         + Path.SEPARATOR + MRConstants.WORKDIR,
-                         defaultJobConf);
+                   getLocalTaskDir(task.getJobID().toString(), 
+                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
+                   + Path.SEPARATOR + MRConstants.WORKDIR,
+                   defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
@@ -1974,9 +2016,13 @@
      * Kick off the task execution
      */
     public synchronized void launchTask() throws IOException {
-      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
         localizeTask(task);
-        this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+          this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        }
         this.runner = task.createRunner(TaskTracker.this, this);
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1986,6 +2032,10 @@
       }
     }
 
+    boolean isCleaningup() {
+   	  return this.taskStatus.inTaskCleanupPhase();
+    }
+    
     /**
      * The task is reporting its progress
      */
@@ -1993,10 +2043,14 @@
     {
       LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + 
           "% " + taskStatus.getStateString());
-      
+      // task will report its state as
+      // COMMIT_PENDING when it is waiting for commit response and 
+      // when it is committing.
+      // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
       if (this.done || 
           (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
-          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+          !isCleaningup())) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
@@ -2047,7 +2101,16 @@
      * The task is reporting that it's done running
      */
     public synchronized void reportDone() {
-      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      if (isCleaningup()) {
+        if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.FAILED);
+        } else if (this.taskStatus.getRunState() == 
+                   TaskStatus.State.KILLED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.KILLED);
+        }
+      } else {
+        this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      }
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -2062,6 +2125,11 @@
       return wasKilled;
     }
 
+    void reportTaskFinished() {
+      taskFinished();
+      releaseSlot();
+    }
+
     /**
      * The task has actually finished running.
      */
@@ -2088,7 +2156,23 @@
         if (!done) {
           if (!wasKilled) {
             failures += 1;
-            taskStatus.setRunState(TaskStatus.State.FAILED);
+            /* State changes:
+             * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+             * FAILED_UNCLEAN -> FAILED 
+             * KILLED_UNCLEAN -> KILLED 
+             */
+            if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            } else if (taskStatus.getRunState() == 
+                       TaskStatus.State.KILLED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.KILLED);
+            } else if (task.isMapOrReduce() && 
+                       taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+              taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+            } else {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            }
+            removeFromMemoryManager(task.getTaskID());
             // call the script here for the failed tasks.
             if (debugCommand != null) {
               String taskStdout ="";
@@ -2114,9 +2198,10 @@
               File workDir = null;
               try {
                 workDir = new File(lDirAlloc.getLocalPathToRead(
-                                     TaskTracker.getJobCacheSubdir() 
-                                     + Path.SEPARATOR + task.getJobID() 
-                                     + Path.SEPARATOR + task.getTaskID()
+                                     TaskTracker.getLocalTaskDir( 
+                                       task.getJobID().toString(), 
+                                       task.getTaskID().toString(),
+                                       task.isTaskCleanupTask())
                                      + Path.SEPARATOR + MRConstants.WORKDIR,
                                      localJobConf). toString());
               } catch (IOException e) {
@@ -2169,14 +2254,14 @@
                 LOG.warn("Exception in add diagnostics!");
               }
             }
-          } else {
-            taskStatus.setRunState(TaskStatus.State.KILLED);
           }
           taskStatus.setProgress(0.0f);
         }
         this.taskStatus.setFinishTime(System.currentTimeMillis());
         needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || 
-                       taskStatus.getRunState() == TaskStatus.State.KILLED);
+                taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+                taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || 
+                taskStatus.getRunState() == TaskStatus.State.KILLED);
       }
 
       //
@@ -2286,7 +2371,8 @@
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
             getRunState() == TaskStatus.State.UNASSIGNED ||
-            getRunState() == TaskStatus.State.COMMIT_PENDING) {
+            getRunState() == TaskStatus.State.COMMIT_PENDING ||
+            isCleaningup()) {
           kill(wasFailure);
         }
       }
@@ -2297,19 +2383,46 @@
 
     /**
      * Something went wrong and the task must be killed.
+     * 
+     * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+     * FAILED_UNCLEAN -> FAILED 
+     * KILLED_UNCLEAN -> KILLED
+     * UNASSIGNED -> FAILED/KILLED
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
+      /* State changes:
+       * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+       * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+       * FAILED_UNCLEAN -> FAILED 
+       * KILLED_UNCLEAN -> KILLED
+       * UNASSIGNED -> FAILED/KILLED 
+       */
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
-          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          isCleaningup()) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
         runner.kill();
-        taskStatus.setRunState((wasFailure) ? 
-                                  TaskStatus.State.FAILED : 
-                                  TaskStatus.State.KILLED);
+        if (task.isMapOrReduce()) {
+          taskStatus.setRunState((wasFailure) ? 
+                                    TaskStatus.State.FAILED_UNCLEAN : 
+                                    TaskStatus.State.KILLED_UNCLEAN);
+        } else {
+          // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+          if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.FAILED);
+          } else if (taskStatus.getRunState() == 
+                     TaskStatus.State.KILLED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.KILLED);
+          } else {
+            taskStatus.setRunState((wasFailure) ? 
+                                      TaskStatus.State.FAILED : 
+                                      TaskStatus.State.KILLED);
+          }
+        }
       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;
@@ -2318,6 +2431,7 @@
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      removeFromMemoryManager(task.getTaskID());
       releaseSlot();
     }
     
@@ -2369,7 +2483,12 @@
 
       synchronized (TaskTracker.this) {
         if (needCleanup) {
-          tasks.remove(taskId);
+          // see if tasks data structure is holding this tip.
+          // tasks could hold the tip for cleanup attempt, if cleanup attempt 
+          // got launched before this method.
+          if (tasks.get(taskId) == this) {
+            tasks.remove(taskId);
+          }
         }
         synchronized (this){
           if (alwaysKeepTaskFiles ||
@@ -2381,8 +2500,8 @@
       }
       synchronized (this) {
         try {
-          String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
-                           + task.getJobID() + Path.SEPARATOR + taskId;
+          String taskDir = getLocalTaskDir(task.getJobID().toString(),
+                             taskId.toString(), task.isTaskCleanupTask());
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2603,15 +2722,10 @@
     }
     if (tip != null) {
       if (!commitPending) {
-        tip.taskFinished();
-        // Remove the entry from taskMemoryManagerThread's data structures.
-        if (isTaskMemoryManagerEnabled()) {
-          taskMemoryManager.removeTask(taskid);
-        }
-        tip.releaseSlot();
+        tip.reportTaskFinished();
       }
     } else {
-      LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+      LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
     }
   }
   
@@ -2838,15 +2952,13 @@
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out.index", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out.index", conf);
         
         // Map-output file
         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Feb  5 17:24:11 2009
@@ -250,7 +250,8 @@
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         mapCount++;
       }
     }
@@ -267,7 +268,8 @@
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         reduceCount++;
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Feb  5 17:24:11 2009
@@ -52,9 +52,10 @@
    *            encapsulates the events and whether to reset events index.
    * Version 13 changed the getTask method signature for HADOOP-249
    * Version 14 changed the getTask method signature for HADOOP-4232
+   * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * */
 
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   /**
    * Called when a child task process starts, to get its task.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Thu Feb  5 17:24:11 2009
@@ -174,8 +174,10 @@
   @Override
   public void abortTask(TaskAttemptContext context) {
     try {
-      context.progress();
-      outputFileSystem.delete(workPath, true);
+      if (workPath != null) { 
+        context.progress();
+        outputFileSystem.delete(workPath, true);
+      }
     } catch (IOException ie) {
       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
     }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Thu Feb  5 17:24:11 2009
@@ -110,7 +110,7 @@
         JobConf confForThisTask = new JobConf(conf);
         confForThisTask.set("mapred.local.dir", localDir);//set the localDir
 
-        Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+        Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
         while (pidFilePath == null) {
           //wait till the pid file is created
           try {
@@ -119,7 +119,7 @@
             LOG.warn("sleep is interrupted:" + ie);
             break;
           }
-          pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+          pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
         }
 
         pid = ProcessTree.getPidFromPidFile(pidFilePath.toString());

Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Thu Feb  5 17:24:11 2009
@@ -67,13 +67,19 @@
         }
       }
     }
-    TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
-        : null;
+    TaskInProgress tip = null;
+    if (job != null && tipidObj != null) {
+      tip = job.getTaskInProgress(tipidObj);
+    }
+    TaskStatus[] ts = null;
+    if (tip != null) { 
+      ts = tip.getTaskStatuses();
+    }
     boolean isCleanupOrSetup = false;
-    if (tipidObj != null) { 
-      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+    if ( tip != null) {
+      isCleanupOrSetup = tip.isJobCleanupTask();
       if (!isCleanupOrSetup) {
-        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+        isCleanupOrSetup = tip.isJobSetupTask();
       }
     }
 %>
@@ -115,14 +121,41 @@
       TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
       out.print("<tr><td>" + status.getTaskID() + "</td>");
       String taskAttemptTracker = null;
+      String cleanupTrackerName = null;
+      TaskTrackerStatus cleanupTracker = null;
+      String cleanupAttemptTracker = null;
+      boolean hasCleanupAttempt = false;
+      if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+        cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+        cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+        if (cleanupTracker != null) {
+          cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+            + cleanupTracker.getHttpPort();
+        }
+        hasCleanupAttempt = true;
+      }
+      out.print("<td>");
+      if (hasCleanupAttempt) {
+        out.print("Task attempt: ");
+      }
       if (taskTracker == null) {
-        out.print("<td>" + taskTrackerName + "</td>");
+        out.print(taskTrackerName);
       } else {
         taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
           + taskTracker.getHttpPort();
-        out.print("<td><a href=\"" + taskAttemptTracker + "\">"
-          + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+        out.print("<a href=\"" + taskAttemptTracker + "\">"
+          + tracker.getNode(taskTracker.getHost()) + "</a>");
+      }
+      if (hasCleanupAttempt) {
+        out.print("<br/>Cleanup Attempt: ");
+        if (cleanupAttemptTracker == null ) {
+          out.print(cleanupTrackerName);
+        } else {
+          out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+            + tracker.getNode(cleanupTracker.getHost()) + "</a>");
         }
+      }
+      out.print("</td>");
         out.print("<td>" + status.getRunState() + "</td>");
         out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
           + ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
         						String.valueOf(taskTracker.getHttpPort()),
         						status.getTaskID().toString());
       	}
+        if (hasCleanupAttempt) {
+          out.print("Task attempt: <br/>");
+        }
         if (taskLogUrl == null) {
           out.print("n/a");
         } else {
@@ -172,6 +208,25 @@
           out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
           out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
         }
+        if (hasCleanupAttempt) {
+          out.print("Cleanup attempt: <br/>");
+          taskLogUrl = null;
+          if (cleanupTracker != null ) {
+        	taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+                                String.valueOf(cleanupTracker.getHttpPort()),
+                                status.getTaskID().toString());
+      	  }
+          if (taskLogUrl == null) {
+            out.print("n/a");
+          } else {
+            String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+            String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+            String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+            out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+            out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+            out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+          }
+        }
         out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
           + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
           + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");