You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/10/10 11:32:50 UTC

svn commit: r583408 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/webapps/job/

Author: acmurthy
Date: Wed Oct 10 02:32:49 2007
New Revision: 583408

URL: http://svn.apache.org/viewvc?rev=583408&view=rev
Log:
HADOOP-1874.  Move task-outputs' promotion/discard to a separate thread distinct from the main heartbeat-processing thread. The main upside being that we do not lock-up the JobTracker during HDFS operations, which otherwise may lead to lost tasktrackers if the NameNode is unresponsive. Contributed by Devaraj Das.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 10 02:32:49 2007
@@ -253,6 +253,12 @@
     HADOOP-1992.  Fix the performance degradation in the sort validator. 
     (acmurthy via omalley)
 
+    HADOOP-1874.  Move task-outputs' promotion/discard to a separate thread
+    distinct from the main heartbeat-processing thread. The main upside being 
+    that we do not lock-up the JobTracker during HDFS operations, which
+    otherwise may lead to lost tasktrackers if the NameNode is unresponsive.
+    (Devaraj Das via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-1908. Restructure data node code so that block sending and 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 10 02:32:49 2007
@@ -37,7 +37,6 @@
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.util.StringUtils;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -380,6 +379,16 @@
       TaskTrackerStatus ttStatus = 
         this.jobtracker.getTaskTracker(status.getTaskTracker());
       String httpTaskLogLocation = null; 
+
+      if (state == TaskStatus.State.COMMIT_PENDING ||
+          state == TaskStatus.State.FAILED ||
+          state == TaskStatus.State.KILLED) {
+        JobWithTaskContext j = new JobWithTaskContext(this, tip, 
+                                                      status.getTaskId(),
+                                                      metrics);
+        jobtracker.addToCommitQueue(j);
+      }
+
       if (null != ttStatus){
         httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" + 
           ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
@@ -388,7 +397,7 @@
 
       TaskCompletionEvent taskEvent = null;
       if (state == TaskStatus.State.SUCCEEDED) {
-        boolean complete = false;
+        completedTask(tip, status, metrics);
         taskEvent = new TaskCompletionEvent(
                                             taskCompletionEventTracker, 
                                             status.getTaskId(),
@@ -397,28 +406,14 @@
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
-        try {
-          complete = completedTask(tip, status, metrics);
-        } catch (IOException ioe) {
-          // Oops! Failed to copy the task's output to its final place;
-          // fail the task!
-          failedTask(tip, status.getTaskId(), 
-                     "Failed to copy reduce's output", 
-                     (tip.isMapTask() ? 
-                         TaskStatus.Phase.MAP : 
-                         TaskStatus.Phase.REDUCE), 
-                     TaskStatus.State.FAILED, 
-                     status.getTaskTracker(), null);
-          LOG.info("Failed to copy the output of " + status.getTaskId() + 
-                   " with: " + StringUtils.stringifyException(ioe));
-          return;
-        }
-        
-        if (complete) {
-          tip.setSuccessEventNumber(taskCompletionEventTracker);
-        } else {
-          taskEvent.setTaskStatus(TaskCompletionEvent.Status.KILLED);
-        }
+        tip.setSuccessEventNumber(taskCompletionEventTracker); 
+      }
+      //For a failed task update the JT datastructures.For the task state where
+      //only the COMMIT is pending, delegate everything to the JT thread. For
+      //failed tasks we want the JT to schedule a reexecution ASAP (and not go
+      //via the queue for the datastructures' updates).
+      else if (state == TaskStatus.State.COMMIT_PENDING) {
+        return;
       } else if (state == TaskStatus.State.FAILED ||
                  state == TaskStatus.State.KILLED) {
         // Get the event number for the (possibly) previously successful
@@ -771,7 +766,7 @@
   public synchronized boolean completedTask(TaskInProgress tip, 
                                          TaskStatus status,
                                          JobTrackerMetrics metrics) 
-  throws IOException {
+  {
     String taskid = status.getTaskId();
         
     // Sanity check: is the TIP already complete? 
@@ -928,14 +923,10 @@
                           TaskStatus status, String trackerName,
                           boolean wasRunning, boolean wasComplete,
                           JobTrackerMetrics metrics) {
-    if(status.getRunState() == TaskStatus.State.KILLED ) {
-      tip.taskKilled(taskid, trackerName, this.status);
-    }
-    else {
-      // Mark the taskid as a 'failure'
-      tip.incompleteSubTask(taskid, trackerName, this.status);
-    }
-
+    
+    // Mark the taskid as FAILED or KILLED
+    tip.incompleteSubTask(taskid, trackerName, this.status);
+   
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
         
@@ -1065,7 +1056,7 @@
                                                     reason,
                                                     reason,
                                                     trackerName, phase,
-                                                    tip.getCounters());
+                                                    null);
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
@@ -1176,6 +1167,32 @@
                  TaskStatus.State.FAILED, trackerName, metrics);
       
       mapTaskIdToFetchFailuresMap.remove(mapTaskId);
+    }
+  }
+  
+  static class JobWithTaskContext {
+    private JobInProgress job;
+    private TaskInProgress tip;
+    private String taskId;
+    private JobTrackerMetrics metrics;
+    JobWithTaskContext(JobInProgress job, TaskInProgress tip, 
+        String taskId, JobTrackerMetrics metrics) {
+      this.job = job;
+      this.tip = tip;
+      this.taskId = taskId;
+      this.metrics = metrics;
+    }
+    JobInProgress getJob() {
+      return job;
+    }
+    TaskInProgress getTIP() {
+      return tip;
+    }
+    String getTaskId() {
+      return taskId;
+    }
+    JobTrackerMetrics getJobTrackerMetrics() {
+      return metrics;
     }
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Oct 10 02:32:49 2007
@@ -39,6 +39,7 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -609,6 +610,8 @@
   Path systemDir = null;
   private JobConf conf;
 
+  private Thread taskCommitThread;
+  
   /**
    * Start the JobTracker process, listen on the indicated port
    */
@@ -663,15 +666,6 @@
 
     myMetrics = new JobTrackerMetrics(this, jobConf);
     
-    this.expireTrackersThread = new Thread(this.expireTrackers,
-                                           "expireTrackers");
-    this.expireTrackersThread.start();
-    this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
-    this.retireJobsThread.start();
-    this.initJobsThread = new Thread(this.initJobs, "initJobs");
-    this.initJobsThread.start();
-    expireLaunchingTaskThread.start();
-        
     // The rpc/web-server ports can be ephemeral ports... 
     // ... ensure we have the correct info
     this.port = interTrackerServer.getListenerAddress().getPort();
@@ -726,6 +720,17 @@
    * Run forever
    */
   public void offerService() throws InterruptedException {
+    this.expireTrackersThread = new Thread(this.expireTrackers,
+                                          "expireTrackers");
+    this.expireTrackersThread.start();
+    this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
+    this.retireJobsThread.start();
+    this.initJobsThread = new Thread(this.initJobs, "initJobs");
+    this.initJobsThread.start();
+    expireLaunchingTaskThread.start();
+    this.taskCommitThread = new TaskCommitQueue();
+    this.taskCommitThread.start();
+
     this.interTrackerServer.join();
     LOG.info("Stopped interTrackerServer");
   }
@@ -781,6 +786,16 @@
         ex.printStackTrace();
       }
     }
+    if (this.taskCommitThread != null) {
+      LOG.info("Stopping TaskCommit thread");
+      this.taskCommitThread.interrupt();
+      try {
+        this.taskCommitThread.interrupt();
+        this.taskCommitThread.join();
+      } catch (InterruptedException ex) {
+        ex.printStackTrace();
+      }
+    }
     LOG.info("stopped all jobtracker services");
     return;
   }
@@ -853,7 +868,8 @@
   void markCompletedJob(JobInProgress job) {
     for (TaskInProgress tip : job.getMapTasks()) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
-        if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+        if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskId());
         }
@@ -861,7 +877,8 @@
     }
     for (TaskInProgress tip : job.getReduceTasks()) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
-        if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+        if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskId());
         }
@@ -1836,6 +1853,146 @@
       removeMarkedTasks(trackerName);
     }
   }
+
+  public void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
+    ((TaskCommitQueue)taskCommitThread).addToQueue(j);
+  }
+  //This thread takes care of things like moving outputs to their final
+  //locations & deleting temporary outputs
+  private class TaskCommitQueue extends Thread {
+    
+    private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue = 
+            new LinkedBlockingQueue <JobInProgress.JobWithTaskContext>();
+        
+    public TaskCommitQueue() {
+      setName("Task Commit Thread");
+      setDaemon(true);
+    }
+    
+    public void addToQueue(JobInProgress.JobWithTaskContext j) {
+      while (!queue.add(j)) {
+        LOG.warn("Couldn't add to the Task Commit queue now. Will " +
+                 "try again");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException ie) {}
+      }
+    }
+       
+    public void run() {
+      while (!isInterrupted()) {
+        JobInProgress.JobWithTaskContext j;
+        try {
+          j = queue.take();
+        } catch (InterruptedException ie) {
+          return;
+        }
+        JobInProgress job = j.getJob();
+        TaskInProgress tip = j.getTIP();
+        String taskid = j.getTaskId();
+        JobTrackerMetrics metrics = j.getJobTrackerMetrics();
+        Task t;
+        TaskStatus status;
+        boolean isTipComplete = false;
+        TaskStatus.State state;
+        synchronized (JobTracker.this) {
+          synchronized (job) {
+            synchronized (tip) {
+              status = tip.getTaskStatus(taskid);
+              t = tip.getTaskObject(taskid);
+              state = status.getRunState();
+              isTipComplete = tip.isComplete();
+            }
+          }
+        }
+        try {
+          //For COMMIT_PENDING tasks, we save the task output in the dfs
+          //as well as manipulate the JT datastructures to reflect a
+          //successful task. This guarantees that we don't declare a task
+          //as having succeeded until we have successfully completed the
+          //dfs operations.
+          //For failed tasks, we just do the dfs operations here. The
+          //datastructures updates is done earlier as soon as the failure
+          //is detected so that the JT can immediately schedule another
+          //attempt for that task.
+          if (state == TaskStatus.State.COMMIT_PENDING) {
+            if (!isTipComplete) {
+              t.saveTaskOutput();
+            }
+            synchronized (JobTracker.this) {
+              //do a check for the case where after the task went to
+              //COMMIT_PENDING, it was lost. So although we would have
+              //saved the task output, we cannot declare it a SUCCESS.
+              TaskStatus newStatus = null;
+              synchronized (job) {
+                synchronized (tip) {
+                  status = tip.getTaskStatus(taskid);
+                  if (!isTipComplete) {
+                    if (status.getRunState() != 
+                         TaskStatus.State.COMMIT_PENDING) {
+                      state = TaskStatus.State.KILLED;
+                    } else {
+                      state = TaskStatus.State.SUCCEEDED;
+                    }
+                  } else {
+                    tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
+                                                        "TIP");
+                    state = TaskStatus.State.KILLED;
+
+                  }
+                  //create new status if required. If the state changed from
+                  //COMMIT_PENDING to KILLED in the JobTracker, while we were
+                  //saving the output,the JT would have called updateTaskStatus
+                  //and we don't need to call it again
+                  if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
+                    newStatus = TaskStatus.createTaskStatus(
+                        tip.isMapTask(), 
+                        taskid,
+                        state == TaskStatus.State.SUCCEEDED ? 1.0f : 0.0f,
+                            state,
+                            status.getDiagnosticInfo(),
+                            status.getStateString(),
+                            status.getTaskTracker(), status.getPhase(),
+                            status.getCounters());
+                  }
+                }
+                if (newStatus != null) {
+                  job.updateTaskStatus(tip, newStatus, metrics);
+                }
+              }
+            }
+          }
+        } catch (IOException ioe) {
+          // Oops! Failed to copy the task's output to its final place;
+          // fail the task!
+          state = TaskStatus.State.FAILED;
+          synchronized (JobTracker.this) {
+            job.failedTask(tip, status.getTaskId(), 
+                     "Failed to rename output with the exception: " + 
+                     StringUtils.stringifyException(ioe), 
+                     (tip.isMapTask() ? 
+                         TaskStatus.Phase.MAP : 
+                         TaskStatus.Phase.REDUCE), 
+                     TaskStatus.State.FAILED,  
+                         status.getTaskTracker(), null);
+          }
+          LOG.info("Failed to rename the output of " + status.getTaskId() + 
+                   " with: " + StringUtils.stringifyException(ioe));
+        }
+        if (state == TaskStatus.State.FAILED || 
+            state == TaskStatus.State.KILLED) {
+          try {
+            t.discardTaskOutput();
+          } catch (IOException ioe) { 
+            LOG.info("Failed to discard the output of task " + 
+                status.getTaskId() + " with: " + 
+                StringUtils.stringifyException(ioe));
+          }
+        }
+      }
+    }
+  }
+  
 
   /**
    * Get the localized job file path on the job trackers local file system

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Oct 10 02:32:49 2007
@@ -438,21 +438,32 @@
   
   private Path getFinalPath(Path jobOutputDir, Path taskOutput) {
     URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
-    return new Path(jobOutputDir, relativePath.getPath());
+    if (relativePath.getPath().length() > 0) {
+      return new Path(jobOutputDir, relativePath.getPath());
+    } else {
+      return jobOutputDir;
+    }
   }
   
   private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput) 
   throws IOException {
     if (fs.isFile(taskOutput)) {
       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
-      fs.mkdirs(finalOutputPath.getParent());
       if (!fs.rename(taskOutput, finalOutputPath)) {
-        throw new IOException("Failed to save output of task: " + 
-                getTaskId());
+        if (!fs.delete(finalOutputPath)) {
+          throw new IOException("Failed to delete earlier output of task: " + 
+                  getTaskId());
+        }
+        if (!fs.rename(taskOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of task: " + 
+                  getTaskId());
+        }
       }
       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
     } else if(fs.isDirectory(taskOutput)) {
       Path[] paths = fs.listPaths(taskOutput);
+      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
+      fs.mkdirs(finalOutputPath);
       if (paths != null) {
         for (Path path : paths) {
           moveTaskOutputs(fs, jobOutputDir, path);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Oct 10 02:32:49 2007
@@ -104,7 +104,6 @@
 
   // Map from taskId -> Task
   private Map<String, Task> tasks = new TreeMap<String, Task>();
-  boolean savedTaskOutput = false;
 
   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
   private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
@@ -185,6 +184,15 @@
     return partition;
   }    
 
+  public boolean isOnlyCommitPending() {
+    for (TaskStatus t : taskStatuses.values()) {
+      if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
   /**
    * Initialization common to Map and Reduce
    */
@@ -219,6 +227,15 @@
   }
     
   /**
+   * Return the Task object associated with a taskId
+   * @param taskId
+   * @return
+   */  
+  public Task getTaskObject(String taskId) {
+    return tasks.get(taskId);
+  }
+  
+  /**
    * Is this tip currently running any tasks?
    * @return true if any tasks are running
    */
@@ -231,7 +248,7 @@
    * 
    * @return <code>true</code> if the tip is complete, else <code>false</code>
    */
-  public boolean isComplete() {
+  public synchronized boolean isComplete() {
     return (completes > 0);
   }
 
@@ -350,7 +367,7 @@
    * @param taskId id of the task 
    * @param diagInfo diagnostic information for the task
    */
-  private void addDiagnosticInfo(String taskId, String diagInfo) {
+  public void addDiagnosticInfo(String taskId, String diagInfo) {
     List<String> diagHistory = taskDiagnosticData.get(taskId);
     if (diagHistory == null) {
       diagHistory = new ArrayList<String>();
@@ -396,7 +413,8 @@
       if (newState == TaskStatus.State.RUNNING &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
-           oldState == TaskStatus.State.SUCCEEDED)) {
+           oldState == TaskStatus.State.SUCCEEDED ||
+           oldState == TaskStatus.State.COMMIT_PENDING)) {
         return false;
       }
           
@@ -419,10 +437,18 @@
     //
     // Note the failure and its location
     //
-    LOG.info("Task '" + taskid + "' has been lost.");
     TaskStatus status = taskStatuses.get(taskid);
     TaskStatus.State taskState = TaskStatus.State.FAILED;
     if (status != null) {
+      // Check if the user manually KILLED/FAILED this task-attempt...
+      Boolean shouldFail = tasksToKill.remove(taskid);
+      if (shouldFail != null) {
+        taskState = (shouldFail) ? TaskStatus.State.FAILED :
+                                   TaskStatus.State.KILLED;
+        status.setRunState(taskState);
+        addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
+      }
+ 
       taskState = status.getRunState();
       if (taskState != TaskStatus.State.FAILED && 
               taskState != TaskStatus.State.KILLED) {
@@ -441,24 +467,18 @@
     this.activeTasks.remove(taskid);
     
     // Since we do not fail completed reduces (whose outputs go to hdfs), we 
-    // should note this failure only for completed maps; however if the job
-    // is done, there is no need to manipulate completed maps
-    if (this.completes > 0 && this.isMapTask() && 
+    // should note this failure only for completed maps, only if this taskid;
+    // completed this map. however if the job is done, there is no need to 
+    // manipulate completed maps
+    if (this.isMapTask() && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
     }
 
-    // Discard task output
-    Task t = tasks.get(taskid);
-    try {
-      t.discardTaskOutput();
-    } catch (IOException ioe) {
-      LOG.info("Failed to discard output of task '" + taskid + "' with " + 
-              StringUtils.stringifyException(ioe));
-    }
 
     if (taskState == TaskStatus.State.FAILED) {
       numTaskFailures++;
+      machinesWhereFailed.add(trackerName);
     } else {
       numKilledTasks++;
     }
@@ -467,7 +487,6 @@
       LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
       kill();
     }
-    machinesWhereFailed.add(trackerName);
   }
 
   /**
@@ -490,14 +509,6 @@
    * taskid as {@link TaskStatus.State.KILLED}. 
    */
   void alreadyCompletedTask(String taskid) {
-    Task t = tasks.get(taskid);
-    try {
-      t.discardTaskOutput();
-    } catch (IOException ioe) {
-      LOG.info("Failed to discard output of task '" + taskid + "' with " + 
-              StringUtils.stringifyException(ioe));
-    }
-
     // 'KILL' the task 
     completedTask(taskid, TaskStatus.State.KILLED);
     
@@ -512,29 +523,11 @@
    * Indicate that one of the taskids in this TaskInProgress
    * has successfully completed!
    */
-  public void completed(String taskid) throws IOException {
-    //
-    // Finalize the task's output
-    //
-    Task t = tasks.get(taskid);
-    if (!savedTaskOutput) {
-      t.saveTaskOutput();
-      savedTaskOutput = true;
-    } else {
-      try {
-        t.discardTaskOutput();
-      } catch (IOException ioe) {
-        LOG.info("Failed to discard 'already-saved' output of task: " + 
-                t.getTaskId() + " with: " + 
-                StringUtils.stringifyException(ioe));
-      }
-    }
-
+  public void completed(String taskid) {
     //
     // Record that this taskid is complete
     //
     completedTask(taskid, TaskStatus.State.SUCCEEDED);
-    
         
     //
     // Now that the TIP is complete, the other speculative 
@@ -545,7 +538,6 @@
     this.completes++;
     recomputeProgress();
     
-    LOG.info("Task '" + taskid + "' has completed succesfully");
   }
 
   /**
@@ -588,7 +580,8 @@
    */
   boolean killTask(String taskId, boolean shouldFail) {
     TaskStatus st = taskStatuses.get(taskId);
-    if(st != null && st.getRunState() == TaskStatus.State.RUNNING
+    if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
+        || st.getRunState() == TaskStatus.State.COMMIT_PENDING)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
                       + " task '" + taskId + "' by user";
@@ -599,32 +592,6 @@
     return false;
   }
 
-  /** Notification that a task with the given id has been killed */
-  void taskKilled(String taskId, String trackerName, JobStatus jobStatus) {
-    Boolean shouldFail = tasksToKill.remove(taskId);
-    if(shouldFail != null && !shouldFail) {
-      LOG.info("Task '" + taskId + "' has been killed");
-      this.activeTasks.remove(taskId);
-      taskStatuses.get(taskId).setRunState(TaskStatus.State.KILLED );
-      addDiagnosticInfo(taskId, "Task has been killed" );
-      // Discard task output
-      Task t = tasks.get(taskId);
-      try {
-        t.discardTaskOutput();
-      } catch (IOException ioe) {
-        LOG.info("Failed to discard output of task '" + taskId + "' with " +
-            StringUtils.stringifyException(ioe));
-      }
-      numKilledTasks++;
-      
-    }
-    else {
-      //set the task status as failed. 
-      taskStatuses.get(taskId).setRunState(TaskStatus.State.FAILED);
-      incompleteSubTask(taskId, trackerName, jobStatus);
-    }
-  }
-
   /**
    * This method is called whenever there's a status change
    * for one of the TIP's sub-tasks.  It recomputes the overall 
@@ -650,6 +617,12 @@
           bestState = status.getStateString();
           bestCounters = status.getCounters();
           break;
+        } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          //for COMMIT_PENDING, we take the last state that we recorded
+          //when the task was RUNNING
+          bestProgress = this.progress;
+          bestState = this.state;
+          bestCounters = this.counters;
         } else if (status.getRunState() == TaskStatus.State.RUNNING) {
           if (status.getProgress() >= bestProgress) {
             bestProgress = status.getProgress();
@@ -692,7 +665,7 @@
         runSpeculative &&
         (averageProgress - progress >= SPECULATIVE_GAP) &&
         (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG) 
-        && completes == 0) {
+        && completes == 0 && !isOnlyCommitPending()) {
       return true;
     }
     return false;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Oct 10 02:32:49 2007
@@ -40,7 +40,8 @@
   public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
 
   // what state is the task in?
-  public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
+  public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
+                            COMMIT_PENDING}
     
   private String taskid;
   private float progress;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Oct 10 02:32:49 2007
@@ -1368,7 +1368,7 @@
      * The task is reporting that it's done running
      */
     public synchronized void reportDone() {
-      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      this.taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -1400,7 +1400,7 @@
       boolean needCleanup = false;
       synchronized (this) {
         if (done) {
-          taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+          taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
         } else {
           if (!wasKilled) {
             failures += 1;
@@ -1477,7 +1477,10 @@
      */
     private synchronized void mapOutputLost(String failure
                                            ) throws IOException {
-      if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
+      //The check for COMMIT_PENDING should actually be a check for SUCCESS
+      //however for that, we have to introduce another Action type from the
+      //JT to the TT (SuccessTaskAction in the lines of KillTaskAction).
+      if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         // change status to failure
         LOG.info("Reporting output lost:"+task.getTaskId());
         taskStatus.setRunState(TaskStatus.State.FAILED);

Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Wed Oct 10 02:32:49 2007
@@ -155,7 +155,7 @@
         }
         out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
           + "&tipid=" + tipid + "&taskid=" + status.getTaskId() + "\">"
-          + status.getCounters().size() + "</a></td>");
+          + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");
         out.print("<td>");
         if (privateActions
           && status.getRunState() == TaskStatus.State.RUNNING) {