You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/05 18:34:11 UTC

svn commit: r741197 [2/2] - in /hadoop/core/branches/branch-0.19: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu Feb  5 17:34:10 2009
@@ -41,7 +41,7 @@
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING}
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
     
   private TaskAttemptID taskid;
   private float progress;
@@ -202,6 +202,12 @@
     }
     this.phase = phase; 
   }
+
+  boolean inTaskCleanupPhase() {
+    return (this.phase == TaskStatus.Phase.CLEANUP && 
+      (this.runState == TaskStatus.State.FAILED_UNCLEAN || 
+      this.runState == TaskStatus.State.KILLED_UNCLEAN));
+  }
   
   public boolean getIncludeCounters() {
     return includeCounters; 
@@ -259,9 +265,9 @@
   /**
    * Update the status of the task.
    * 
+   * @param runstate
    * @param progress
    * @param state
-   * @param phase
    * @param counters
    */
   synchronized void statusUpdate(State runState, 
@@ -298,7 +304,33 @@
     this.counters = status.getCounters();
     this.outputSize = status.outputSize;
   }
-  
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in JobTracker when a cleanup attempt of task
+   * reports its status. Then update only specific fields, not all.
+   * 
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(State runState, 
+                                 float progress,
+                                 String state, 
+                                 Phase phase,
+                                 long finishTime) {
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime; 
+    }
+  }
+
   /**
    * Clear out transient information after sending out a status-update
    * from either the {@link Task} to the {@link TaskTracker} or from the

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb  5 17:34:10 2009
@@ -181,7 +181,8 @@
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
-  private static final String PIDDIR = "pids";
+  private static final String PID = "pid";
+  private static final String OUTPUT = "output";
   private JobConf originalConf;
   private JobConf fConf;
   private int maxCurrentMapTasks;
@@ -358,10 +359,36 @@
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
 
-  static String getPidFilesSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+  static String getLocalJobDir(String jobid) {
+	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
   }
-    
+
+  static String getLocalTaskDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid, false) ; 
+  }
+
+  static String getIntermediateOutputDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid) 
+           + Path.SEPARATOR + TaskTracker.OUTPUT ; 
+  }
+
+  static String getLocalTaskDir(String jobid, 
+                                String taskid, 
+                                boolean isCleanupAttempt) {
+	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+	if (isCleanupAttempt) { 
+      taskDir = taskDir + ".cleanup";
+	}
+	return taskDir;
+  }
+
+  static String getPidFile(String jobid, 
+                           String taskid, 
+                           boolean isCleanup) {
+    return  getLocalTaskDir(jobid, taskid, isCleanup)
+            + Path.SEPARATOR + PID;
+  }
+
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
     if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -699,9 +726,9 @@
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
     }
-    Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                                    + Path.SEPARATOR + jobId 
-                                    + Path.SEPARATOR + "job.xml"),
+    Path localJobFile = lDirAlloc.getLocalPathForWrite(
+                                    getLocalJobDir(jobId.toString())
+                                    + Path.SEPARATOR + "job.xml",
                                     jobFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, tip);
     synchronized (rjob) {
@@ -725,9 +752,9 @@
         
         // create the 'work' directory
         // job-specific shared directory for use as scratch space 
-        Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                       + Path.SEPARATOR + jobId 
-                       + Path.SEPARATOR + "work"), fConf);
+        Path workDir = lDirAlloc.getLocalPathForWrite(
+                         (getLocalJobDir(jobId.toString())
+                         + Path.SEPARATOR + "work"), fConf);
         if (!localFs.mkdirs(workDir)) {
           throw new IOException("Mkdirs failed to create " 
                       + workDir.toString());
@@ -749,8 +776,7 @@
           // Here we check for and we check five times the size of jarFileSize
           // to accommodate for unjarring the jar file in work directory 
           localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
-                                     getJobCacheSubdir()
-                                     + Path.SEPARATOR + jobId 
+                                     getLocalJobDir(jobId.toString())
                                      + Path.SEPARATOR + "jars",
                                      5 * jarFileSize, fConf), "job.jar");
           if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1164,7 +1190,8 @@
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            !taskStatus.inTaskCleanupPhase()) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
           } else {
@@ -1281,7 +1308,8 @@
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
-          tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          tip.isCleaningup()) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
         long jobTaskTimeout = tip.getTaskTimeout();
@@ -1335,8 +1363,7 @@
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
           directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
-                                   SUBDIR + Path.SEPARATOR + JOBCACHE + 
-                                   Path.SEPARATOR +  rjob.getJobID()));
+            getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1581,7 +1608,9 @@
           }
           synchronized (tip) {
             //to make sure that there is no kill task action for this
-            if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+            if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+                tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+                tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
               //got killed externally while still in the launcher queue
               addFreeSlot();
               continue;
@@ -1602,7 +1631,8 @@
   private TaskInProgress registerTask(LaunchTaskAction action, 
       TaskLauncher launcher) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+             " task's state:" + t.getState());
     TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
@@ -1624,10 +1654,6 @@
   private void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
-      if (isTaskMemoryManagerEnabled()) {
-        taskMemoryManager.addTask(tip.getTask().getTaskID(), 
-            getMemoryForTask(tip.getJobConf()));
-      }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -1648,7 +1674,23 @@
       }
     }
   }
-    
+  
+  void addToMemoryManager(TaskAttemptID attemptId, 
+                          JobConf conf, 
+                          String pidFile) {
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.addTask(attemptId, 
+        getMemoryForTask(conf), pidFile);
+    }
+  }
+
+  void removeFromMemoryManager(TaskAttemptID attemptId) {
+    // Remove the entry from taskMemoryManagerThread's data structures.
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.removeTask(attemptId);
+    }
+  }
+
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -1735,10 +1777,12 @@
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
-                                               TaskStatus.State.UNASSIGNED, 
+                                               task.getState(),
                                                diagnosticInfo.toString(), 
                                                "initializing",  
                                                getName(), 
+                                               task.isTaskCleanupTask() ? 
+                                                 TaskStatus.Phase.CLEANUP :  
                                                task.isMapTask()? TaskStatus.Phase.MAP:
                                                TaskStatus.Phase.SHUFFLE,
                                                task.getCounters()); 
@@ -1748,9 +1792,10 @@
     private void localizeTask(Task task) throws IOException{
 
       Path localTaskDir = 
-        lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
-                    Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
-                    task.getTaskID()), defaultJobConf );
+        lDirAlloc.getLocalPathForWrite(
+          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
+            task.getTaskID().toString(), task.isTaskCleanupTask()), 
+          defaultJobConf );
       
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
@@ -1760,8 +1805,7 @@
 
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
+                         TaskTracker.getLocalJobDir(task.getJobID().toString())
                          + Path.SEPARATOR  
                          + "work", defaultJobConf).toString();
       String link = localTaskDir.getParent().toString() 
@@ -1772,11 +1816,10 @@
       
       // create the working-directory of the task 
       Path cwd = lDirAlloc.getLocalPathForWrite(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
-                         + Path.SEPARATOR + task.getTaskID()
-                         + Path.SEPARATOR + MRConstants.WORKDIR,
-                         defaultJobConf);
+                   getLocalTaskDir(task.getJobID().toString(), 
+                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
+                   + Path.SEPARATOR + MRConstants.WORKDIR,
+                   defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
@@ -1870,9 +1913,13 @@
      * Kick off the task execution
      */
     public synchronized void launchTask() throws IOException {
-      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
         localizeTask(task);
-        this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+          this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        }
         this.runner = task.createRunner(TaskTracker.this, this);
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1882,6 +1929,10 @@
       }
     }
 
+    boolean isCleaningup() {
+   	  return this.taskStatus.inTaskCleanupPhase();
+    }
+    
     /**
      * The task is reporting its progress
      */
@@ -1889,10 +1940,14 @@
     {
       LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + 
           "% " + taskStatus.getStateString());
-      
+      // task will report its state as
+      // COMMIT_PENDING when it is waiting for commit response and 
+      // when it is committing.
+      // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
       if (this.done || 
           (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
-          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+          !isCleaningup())) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
@@ -1943,7 +1998,16 @@
      * The task is reporting that it's done running
      */
     public synchronized void reportDone() {
-      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      if (isCleaningup()) {
+        if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.FAILED);
+        } else if (this.taskStatus.getRunState() == 
+                   TaskStatus.State.KILLED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.KILLED);
+        }
+      } else {
+        this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      }
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -1958,6 +2022,11 @@
       return wasKilled;
     }
 
+    void reportTaskFinished() {
+      taskFinished();
+      releaseSlot();
+    }
+
     /**
      * The task has actually finished running.
      */
@@ -1984,7 +2053,23 @@
         if (!done) {
           if (!wasKilled) {
             failures += 1;
-            taskStatus.setRunState(TaskStatus.State.FAILED);
+            /* State changes:
+             * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+             * FAILED_UNCLEAN -> FAILED 
+             * KILLED_UNCLEAN -> KILLED 
+             */
+            if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            } else if (taskStatus.getRunState() == 
+                       TaskStatus.State.KILLED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.KILLED);
+            } else if (task.isMapOrReduce() && 
+                       taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+              taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+            } else {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            }
+            removeFromMemoryManager(task.getTaskID());
             // call the script here for the failed tasks.
             if (debugCommand != null) {
               String taskStdout ="";
@@ -2010,9 +2095,10 @@
               File workDir = null;
               try {
                 workDir = new File(lDirAlloc.getLocalPathToRead(
-                                     TaskTracker.getJobCacheSubdir() 
-                                     + Path.SEPARATOR + task.getJobID() 
-                                     + Path.SEPARATOR + task.getTaskID()
+                                     TaskTracker.getLocalTaskDir( 
+                                       task.getJobID().toString(), 
+                                       task.getTaskID().toString(),
+                                       task.isTaskCleanupTask())
                                      + Path.SEPARATOR + MRConstants.WORKDIR,
                                      localJobConf). toString());
               } catch (IOException e) {
@@ -2065,14 +2151,14 @@
                 LOG.warn("Exception in add diagnostics!");
               }
             }
-          } else {
-            taskStatus.setRunState(TaskStatus.State.KILLED);
           }
           taskStatus.setProgress(0.0f);
         }
         this.taskStatus.setFinishTime(System.currentTimeMillis());
         needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || 
-                       taskStatus.getRunState() == TaskStatus.State.KILLED);
+                taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+                taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || 
+                taskStatus.getRunState() == TaskStatus.State.KILLED);
       }
 
       //
@@ -2182,7 +2268,8 @@
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
             getRunState() == TaskStatus.State.UNASSIGNED ||
-            getRunState() == TaskStatus.State.COMMIT_PENDING) {
+            getRunState() == TaskStatus.State.COMMIT_PENDING ||
+            isCleaningup()) {
           kill(wasFailure);
         }
       }
@@ -2196,16 +2283,38 @@
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
+      /* State changes:
+       * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+       * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+       * FAILED_UNCLEAN -> FAILED 
+       * KILLED_UNCLEAN -> KILLED
+       * UNASSIGNED -> FAILED/KILLED 
+       */
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
-          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          isCleaningup()) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
         runner.kill();
-        taskStatus.setRunState((wasFailure) ? 
-                                  TaskStatus.State.FAILED : 
-                                  TaskStatus.State.KILLED);
+        if (task.isMapOrReduce()) {
+          taskStatus.setRunState((wasFailure) ? 
+                                    TaskStatus.State.FAILED_UNCLEAN : 
+                                    TaskStatus.State.KILLED_UNCLEAN);
+        } else {
+          // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+          if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.FAILED);
+          } else if (taskStatus.getRunState() == 
+                     TaskStatus.State.KILLED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.KILLED);
+          } else {
+            taskStatus.setRunState((wasFailure) ? 
+                                      TaskStatus.State.FAILED : 
+                                      TaskStatus.State.KILLED);
+          }
+        }
       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;
@@ -2214,6 +2323,7 @@
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      removeFromMemoryManager(task.getTaskID());
       releaseSlot();
     }
     
@@ -2265,7 +2375,12 @@
 
       synchronized (TaskTracker.this) {
         if (needCleanup) {
-          tasks.remove(taskId);
+          // see if tasks data structure is holding this tip.
+          // tasks could hold the tip for cleanup attempt, if cleanup attempt 
+          // got launched before this method.
+          if (tasks.get(taskId) == this) {
+            tasks.remove(taskId);
+          }
         }
         synchronized (this){
           if (alwaysKeepTaskFiles ||
@@ -2277,8 +2392,8 @@
       }
       synchronized (this) {
         try {
-          String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
-                           + task.getJobID() + Path.SEPARATOR + taskId;
+          String taskDir = getLocalTaskDir(task.getJobID().toString(),
+                             taskId.toString(), task.isTaskCleanupTask());
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2415,7 +2530,7 @@
   throws IOException {
     LOG.info("Task " + taskid + " is in COMMIT_PENDING");
     statusUpdate(taskid, taskStatus);
-    reportTaskFinished(taskid, true);
+    reportTaskFinished();
   }
   
   /**
@@ -2490,31 +2605,14 @@
   //  Called by TaskTracker thread after task process ends
   /////////////////////////////////////////////////////
   /**
-   * The task is no longer running.  It may not have completed successfully
+   * when you see report task finished, wake up the heartbeat
    */
-  void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
-    TaskInProgress tip;
-    synchronized (this) {
-      tip = tasks.get(taskid);
-    }
-    if (tip != null) {
-      if (!commitPending) {
-        tip.taskFinished();
-        // Remove the entry from taskMemoryManagerThread's data structures.
-        if (isTaskMemoryManagerEnabled()) {
-          taskMemoryManager.removeTask(taskid);
-        }
-        tip.releaseSlot();
-      }
-      synchronized(finishedCount) {
-        finishedCount[0]++;
-        finishedCount.notify();
-      }
-    } else {
-      LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+  void reportTaskFinished() {
+    synchronized(finishedCount) {
+      finishedCount[0]++;
+      finishedCount.notify();
     }
   }
-  
 
   /**
    * A completed map task's output has been lost.
@@ -2740,15 +2838,13 @@
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out.index", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out.index", conf);
         
         // Map-output file
         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Feb  5 17:34:10 2009
@@ -206,7 +206,8 @@
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         mapCount++;
       }
     }
@@ -223,7 +224,8 @@
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         reduceCount++;
       }
     }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Feb  5 17:34:10 2009
@@ -52,9 +52,10 @@
    *            encapsulates the events and whether to reset events index.
    * Version 13 changed the getTask method signature for HADOOP-249
    * Version 14 changed the getTask method signature for HADOOP-4232
+   * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * */
 
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   /**
    * Called when a child task process starts, to get its task.

Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=741197&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java (added)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java Thu Feb  5 17:34:10 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestTaskFail extends TestCase {
+  public static class MapperClass extends MapReduceBase
+  implements Mapper<LongWritable, Text, Text, IntWritable> {
+    String taskid;
+    public void configure(JobConf job) {
+      taskid = job.get("mapred.task.id");
+    }
+    public void map (LongWritable key, Text value, 
+                     OutputCollector<Text, IntWritable> output, 
+                     Reporter reporter) throws IOException {
+      if (taskid.endsWith("_0")) {
+        throw new IOException();
+      } else if (taskid.endsWith("_1")) {
+        System.exit(-1);
+      } 
+    }
+  }
+
+  public RunningJob launchJob(JobConf conf,
+                              Path inDir,
+                              Path outDir,
+                              String input) 
+  throws IOException {
+    // set up the input file system and write input text.
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir, true);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      // write input into input file
+      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+
+    // configure the mapred Job
+    conf.setMapperClass(MapperClass.class);        
+    conf.setReducerClass(IdentityReducer.class);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                                    "/tmp")).toString().replace(' ', '+');
+    conf.set("test.build.data", TEST_ROOT_DIR);
+    // return the RunningJob handle.
+    return new JobClient(conf).submitJob(conf);
+  }
+		  
+  public void testWithDFS() throws IOException {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 4;
+
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      JobConf jobConf = mr.createJobConf();
+      final Path inDir = new Path("./input");
+      final Path outDir = new Path("./output");
+      String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+      RunningJob job = null;
+
+      job = launchJob(jobConf, inDir, outDir, input);
+      // wait for the job to finish.
+      while (!job.isComplete());
+      assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+      
+      JobID jobId = job.getID();
+      // construct the task id of first map task
+      TaskAttemptID attemptId = 
+        new TaskAttemptID(new TaskID(jobId, true, 0), 0);
+      TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+                              getTip(attemptId.getTaskID());
+      // this should not be cleanup attempt since the first attempt 
+      // fails with an exception
+      assertTrue(!tip.isCleanupAttempt(attemptId));
+      TaskStatus ts = 
+        mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+      assertTrue(ts != null);
+      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+      
+      attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
+      // this should be cleanup attempt since the second attempt fails
+      // with System.exit
+      assertTrue(tip.isCleanupAttempt(attemptId));
+      ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+      assertTrue(ts != null);
+      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestTaskFail td = new TestTaskFail();
+    td.testWithDFS();
+  }
+}

Modified: hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp Thu Feb  5 17:34:10 2009
@@ -67,13 +67,19 @@
         }
       }
     }
-    TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
-        : null;
+    TaskInProgress tip = null;
+    if (job != null && tipidObj != null) {
+      tip = job.getTaskInProgress(tipidObj);
+    }
+    TaskStatus[] ts = null;
+    if (tip != null) { 
+      ts = tip.getTaskStatuses();
+    }
     boolean isCleanupOrSetup = false;
-    if (tipidObj != null) { 
-      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+    if ( tip != null) {
+      isCleanupOrSetup = tip.isJobCleanupTask();
       if (!isCleanupOrSetup) {
-        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+        isCleanupOrSetup = tip.isJobSetupTask();
       }
     }
 %>
@@ -115,14 +121,41 @@
       TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
       out.print("<tr><td>" + status.getTaskID() + "</td>");
       String taskAttemptTracker = null;
+      String cleanupTrackerName = null;
+      TaskTrackerStatus cleanupTracker = null;
+      String cleanupAttemptTracker = null;
+      boolean hasCleanupAttempt = false;
+      if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+        cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+        cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+        if (cleanupTracker != null) {
+          cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+            + cleanupTracker.getHttpPort();
+        }
+        hasCleanupAttempt = true;
+      }
+      out.print("<td>");
+      if (hasCleanupAttempt) {
+        out.print("Task attempt: ");
+      }
       if (taskTracker == null) {
-        out.print("<td>" + taskTrackerName + "</td>");
+        out.print(taskTrackerName);
       } else {
         taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
           + taskTracker.getHttpPort();
-        out.print("<td><a href=\"" + taskAttemptTracker + "\">"
-          + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+        out.print("<a href=\"" + taskAttemptTracker + "\">"
+          + tracker.getNode(taskTracker.getHost()) + "</a>");
+      }
+      if (hasCleanupAttempt) {
+        out.print("<br/>Cleanup Attempt: ");
+        if (cleanupAttemptTracker == null ) {
+          out.print(cleanupTrackerName);
+        } else {
+          out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+            + tracker.getNode(cleanupTracker.getHost()) + "</a>");
         }
+      }
+      out.print("</td>");
         out.print("<td>" + status.getRunState() + "</td>");
         out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
           + ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
         						String.valueOf(taskTracker.getHttpPort()),
         						status.getTaskID().toString());
       	}
+        if (hasCleanupAttempt) {
+          out.print("Task attempt: <br/>");
+        }
         if (taskLogUrl == null) {
           out.print("n/a");
         } else {
@@ -172,6 +208,25 @@
           out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
           out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
         }
+        if (hasCleanupAttempt) {
+          out.print("Cleanup attempt: <br/>");
+          taskLogUrl = null;
+          if (cleanupTracker != null ) {
+        	taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+                                String.valueOf(cleanupTracker.getHttpPort()),
+                                status.getTaskID().toString());
+      	  }
+          if (taskLogUrl == null) {
+            out.print("n/a");
+          } else {
+            String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+            String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+            String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+            out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+            out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+            out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+          }
+        }
         out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
           + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
           + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");