You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/05 12:50:06 UTC

svn commit: r692408 [2/2] - in /hadoop/core/trunk: ./ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Sep  5 03:50:04 2008
@@ -21,7 +21,6 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.net.URI;
 import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -34,8 +33,6 @@
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -48,7 +45,6 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
@@ -110,8 +106,8 @@
   private String jobFile;                         // job configuration file
   private TaskAttemptID taskId;                          // unique, includes job id
   private int partition;                          // id within job
-  TaskStatus taskStatus; 										      // current status of the task
-  private Path taskOutputPath;                    // task-specific output dir
+  TaskStatus taskStatus;                          // current status of the task
+  protected boolean cleanupJob = false;
   
   //failed ranges from previous attempts
   private SortedRanges failedRanges = new SortedRanges();
@@ -125,6 +121,10 @@
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
+  private final static int MAX_RETRIES = 10;
+  protected JobContext jobContext;
+  protected TaskAttemptContext taskContext;
+  private volatile boolean commitPending = false;
 
   ////////////////////////////////////////////
   // Constructors
@@ -220,6 +220,13 @@
     this.skipping = skipping;
   }
 
+  /**
+   * Sets whether the task is cleanup task
+   */
+  public void setCleanupTask() {
+    cleanupJob = true;
+  }
+
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -228,48 +235,27 @@
     Text.writeString(out, jobFile);
     taskId.write(out);
     out.writeInt(partition);
-    if (taskOutputPath != null) {
-      Text.writeString(out, taskOutputPath.toString());
-    } else {
-      Text.writeString(out, "");
-    }
     taskStatus.write(out);
     failedRanges.write(out);
     out.writeBoolean(skipping);
+    out.writeBoolean(cleanupJob);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
     partition = in.readInt();
-    String outPath = Text.readString(in);
-    if (outPath.length() != 0) {
-      taskOutputPath = new Path(outPath);
-    } else {
-      taskOutputPath = null;
-    }
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
     failedRanges.readFields(in);
     currentRecIndexIterator = failedRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
+    cleanupJob = in.readBoolean();
   }
 
   @Override
   public String toString() { return taskId.toString(); }
 
-  private Path getTaskOutputPath(JobConf conf) {
-    Path p = new Path(FileOutputFormat.getOutputPath(conf), 
-      (MRConstants.TEMP_DIR_NAME + Path.SEPARATOR + "_" + taskId));
-    try {
-      FileSystem fs = p.getFileSystem(conf);
-      return p.makeQualified(fs);
-    } catch (IOException ie) {
-      LOG.warn(StringUtils.stringifyException(ie));
-      return p;
-    }
-  }
-  
   /**
    * Localize the given JobConf to be specific for this task.
    */
@@ -279,12 +265,18 @@
     conf.setBoolean("mapred.task.is.map", isMapTask());
     conf.setInt("mapred.task.partition", partition);
     conf.set("mapred.job.id", taskId.getJobID().toString());
-    
-    // The task-specific output path
-    if (FileOutputFormat.getOutputPath(conf) != null) {
-      taskOutputPath = getTaskOutputPath(conf);
-      FileOutputFormat.setWorkOutputPath(conf, taskOutputPath);
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      OutputCommitter committer = conf.getOutputCommitter();
+      if ((committer instanceof FileOutputCommitter)) {
+        TaskAttemptContext context = new TaskAttemptContext(conf, taskId);
+        FileOutputFormat.setWorkOutputPath(conf, 
+          ((FileOutputCommitter)committer).getTempTaskOutputPath(context));
+      } else {
+        FileOutputFormat.setWorkOutputPath(conf, outputPath);
+      }
     }
+
   }
   
   /** Run this task as a part of the named job.  This method is executed in the
@@ -359,8 +351,17 @@
               if (sendProgress) {
                 // we need to send progress update
                 updateCounters();
-                taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
-                        counters);
+                if (commitPending) {
+                  taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+                                          taskProgress.get(),
+                                          taskProgress.toString(), 
+                                          counters);
+                } else {
+                  taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+                                          taskProgress.get(),
+                                          taskProgress.toString(), 
+                                          counters);
+                }
                 taskFound = umbilical.statusUpdate(taskId, taskStatus);
                 taskStatus.clearStatus();
               }
@@ -396,6 +397,13 @@
     LOG.debug(getTaskID() + " Progress/ping thread started");
   }
 
+  public void initialize(JobConf job, Reporter reporter) 
+  throws IOException {
+    jobContext = new JobContext(job, reporter);
+    taskContext = new TaskAttemptContext(job, taskId, reporter);
+    OutputCommitter committer = conf.getOutputCommitter();
+    committer.setupTask(taskContext);
+  }
   
   protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) 
     throws IOException 
@@ -543,54 +551,86 @@
   }
 
   public void done(TaskUmbilicalProtocol umbilical) throws IOException {
-    int retries = 10;
-    boolean needProgress = true;
+    LOG.info("Task:" + taskId + " is done."
+             + " And is in the process of commiting");
     updateCounters();
+
+    OutputCommitter outputCommitter = conf.getOutputCommitter();
+    // check whether the commit is required.
+    boolean commitRequired = outputCommitter.needsTaskCommit(taskContext);
+    if (commitRequired) {
+      int retries = MAX_RETRIES;
+      taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
+      commitPending = true;
+      // say the task tracker that task is commit pending
+      while (true) {
+        try {
+          umbilical.commitPending(taskId, taskStatus);
+          break;
+        } catch (InterruptedException ie) {
+          // ignore
+        } catch (IOException ie) {
+          LOG.warn("Failure sending commit pending: " + 
+                    StringUtils.stringifyException(ie));
+          if (--retries == 0) {
+            System.exit(67);
+          }
+        }
+      }
+      //wait for commit approval and commit
+      commit(umbilical, outputCommitter);
+    }
     taskDone.set(true);
+    sendLastUpdate(umbilical);
+    //signal the tasktracker that we are done
+    sendDone(umbilical);
+  }
+
+  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    //first wait for the COMMIT approval from the tasktracker
+    int retries = MAX_RETRIES;
     while (true) {
       try {
-        if (needProgress) {
-          // send a final status report
-          taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
+        // send a final status report
+        if (commitPending) {
+          taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
+                                  taskProgress.get(),
+                                  taskProgress.toString(), 
+                                  counters);
+        } else {
+          taskStatus.statusUpdate(TaskStatus.State.RUNNING,
+                                  taskProgress.get(),
+                                  taskProgress.toString(), 
                                   counters);
-          try {
-            if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-              LOG.warn("Parent died.  Exiting "+taskId);
-              System.exit(66);
-            }
-            taskStatus.clearStatus();
-            needProgress = false;
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();       // interrupt ourself
-          }
         }
-        // Check whether there is any task output
-        boolean shouldBePromoted = false;
+
         try {
-          if (taskOutputPath != null) {
-            // Get the file-system for the task output directory
-            FileSystem fs = taskOutputPath.getFileSystem(conf);
-            if (fs.exists(taskOutputPath)) {
-              // Get the summary for the folder
-              ContentSummary summary = fs.getContentSummary(taskOutputPath);
-              // Check if the directory contains data to be promoted
-              // i.e total-files + total-folders - 1(itself)
-              if (summary != null 
-                  && (summary.getFileCount() + summary.getDirectoryCount() - 1)
-                      > 0) {
-                shouldBePromoted = true;
-              }
-            } else {
-              LOG.info(getTaskID() + ": No outputs to promote from " + 
-                       taskOutputPath);
-            }
+          if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+            LOG.warn("Parent died.  Exiting "+taskId);
+            System.exit(66);
           }
-        } catch (IOException ioe) {
-          // To be safe in case of an exception
-          shouldBePromoted = true;
+          taskStatus.clearStatus();
+          return;
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt(); // interrupt ourself
         }
-        umbilical.done(taskId, shouldBePromoted);
-        LOG.info("Task '" + getTaskID() + "' done.");
+      } catch (IOException ie) {
+        LOG.warn("Failure sending last status update: " + 
+                  StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          throw ie;
+        }
+      }
+    }
+  }
+
+  private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        umbilical.done(getTaskID());
+        LOG.info("Task '" + taskId + "' done.");
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 
@@ -601,15 +641,66 @@
       }
     }
   }
+
+  private void commit(TaskUmbilicalProtocol umbilical,
+                      OutputCommitter committer) throws IOException {
+    int retries = MAX_RETRIES;
+    while (true) {
+      try {
+        while (!umbilical.canCommit(taskId)) {
+          try {
+            Thread.sleep(1000);
+          } catch(InterruptedException ie) {
+            //ignore
+          }
+          setProgressFlag();
+        }
+        // task can Commit now  
+        try {
+          LOG.info("Task " + taskId + " is allowed to commit now");
+          committer.commitTask(taskContext);
+          return;
+        } catch (IOException iee) {
+          LOG.warn("Failure committing: " + 
+                    StringUtils.stringifyException(iee));
+          discardOutput(taskContext, committer);
+          throw iee;
+        }
+      } catch (IOException ie) {
+        LOG.warn("Failure asking whether task can commit: " + 
+            StringUtils.stringifyException(ie));
+        if (--retries == 0) {
+          //if it couldn't commit a successfully then delete the output
+          discardOutput(taskContext, committer);
+          System.exit(68);
+        }
+      }
+    }
+  }
+
+  private void discardOutput(TaskAttemptContext taskContext,
+                             OutputCommitter committer) {
+    try {
+      committer.abortTask(taskContext);
+    } catch (IOException ioe)  {
+      LOG.warn("Failure cleaning up: " + 
+               StringUtils.stringifyException(ioe));
+    }
+  }
+
+  protected void runCleanup(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // set phase for this task
+    setPhase(TaskStatus.Phase.CLEANUP);
+    getProgress().setStatus("cleanup");
+    // do the cleanup
+    conf.getOutputCommitter().cleanupJob(jobContext);
+    done(umbilical);
+  }
   
   public void setConf(Configuration conf) {
     if (conf instanceof JobConf) {
       this.conf = (JobConf) conf;
-
-      if (taskId != null && taskOutputPath == null && 
-          FileOutputFormat.getOutputPath(this.conf) != null) {
-        taskOutputPath = getTaskOutputPath(this.conf);
-      }
     } else {
       this.conf = new JobConf(conf);
     }
@@ -633,68 +724,6 @@
   }
 
   /**
-   * Save the task's output on successful completion.
-   * 
-   * @throws IOException
-   */
-  void saveTaskOutput() throws IOException {
-
-    if (taskOutputPath != null) {
-      FileSystem fs = taskOutputPath.getFileSystem(conf);
-      if (fs.exists(taskOutputPath)) {
-        Path jobOutputPath = taskOutputPath.getParent().getParent();
-
-        // Move the task outputs to their final place
-        moveTaskOutputs(fs, jobOutputPath, taskOutputPath);
-
-        // Delete the temporary task-specific output directory
-        if (!fs.delete(taskOutputPath, true)) {
-          LOG.info("Failed to delete the temporary output directory of task: " + 
-                  getTaskID() + " - " + taskOutputPath);
-        }
-        
-        LOG.info("Saved output of task '" + getTaskID() + "' to " + jobOutputPath);
-      }
-    }
-  }
-  
-  private Path getFinalPath(Path jobOutputDir, Path taskOutput) {
-    URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
-    if (relativePath.getPath().length() > 0) {
-      return new Path(jobOutputDir, relativePath.getPath());
-    } else {
-      return jobOutputDir;
-    }
-  }
-  
-  private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput) 
-  throws IOException {
-    if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
-      if (!fs.rename(taskOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of task: " + 
-                  getTaskID());
-        }
-        if (!fs.rename(taskOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of task: " + 
-                  getTaskID());
-        }
-      }
-      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-    } else if(fs.isDirectory(taskOutput)) {
-      FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
-      fs.mkdirs(finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveTaskOutputs(fs, jobOutputDir, path.getPath());
-        }
-      }
-    }
-  }
-
-  /**
    * OutputCollector for the combiner.
    */
   protected static class CombineOutputCollector<K extends Object, V extends Object> 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java Fri Sep  5 03:50:04 2008
@@ -0,0 +1,56 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+public class TaskAttemptContext extends JobContext {
+
+  private JobConf conf;
+  private TaskAttemptID taskid;
+  
+  TaskAttemptContext(JobConf conf, TaskAttemptID taskid) {
+    this(conf, taskid, Reporter.NULL);
+  }
+  
+  TaskAttemptContext(JobConf conf, TaskAttemptID taskid,
+                     Progressable progress) {
+    super(conf, progress);
+    this.conf = conf;
+    this.taskid = taskid;
+  }
+  
+  /**
+   * Get the taskAttemptID.
+   *  
+   * @return TaskAttemptID
+   */
+  public TaskAttemptID getTaskAttemptID() {
+    return taskid;
+  }
+  
+  /**
+   * Get the job Configuration.
+   * 
+   * @return JobConf
+   */
+  public JobConf getJobConf() {
+    return conf;
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Sep  5 03:50:04 2008
@@ -79,6 +79,7 @@
   private boolean killed = false;
   private volatile SortedRanges failedRanges = new SortedRanges();
   private volatile boolean skipping = false;
+  private boolean cleanup = false; 
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -107,6 +108,9 @@
   //list of tasks to kill, <taskid> -> <shouldFail> 
   private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
   
+  //task to commit, <taskattemptid>  
+  private TaskAttemptID taskToCommit;
+  
   private Counters counters = new Counters();
   
 
@@ -164,6 +168,14 @@
     return partition;
   }    
 
+  public boolean isCleanupTask() {
+   return cleanup;
+  }
+  
+  public void setCleanupTask() {
+    cleanup = true;
+  }
+  
   public boolean isOnlyCommitPending() {
     for (TaskStatus t : taskStatuses.values()) {
       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
@@ -172,6 +184,14 @@
     }
     return false;
   }
+ 
+  public boolean isCommitPending(TaskAttemptID taskId) {
+    TaskStatus t = taskStatuses.get(taskId);
+    if (t == null) {
+      return false;
+    }
+    return t.getRunState() ==  TaskStatus.State.COMMIT_PENDING;
+  }
   
   /**
    * Initialization common to Map and Reduce
@@ -324,6 +344,10 @@
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
+    } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
+               !tasksReportedClosed.contains(taskid)) {
+      tasksReportedClosed.add(taskid);
+      close = true; 
     } else {
       close = tasksToKill.keySet().contains(taskid);
     }   
@@ -331,6 +355,21 @@
   }
 
   /**
+   * Commit this task attempt for the tip. 
+   * @param taskid
+   */
+  public void doCommit(TaskAttemptID taskid) {
+    taskToCommit = taskid;
+  }
+
+  /**
+   * Returns whether the task attempt should be committed or not 
+   */
+  public boolean shouldCommit(TaskAttemptID taskid) {
+    return taskToCommit.equals(taskid);
+  }
+
+  /**
    * Creates a "status report" for this task.  Includes the
    * task ID and overall status, plus reports for all the
    * component task-threads that have ever been started.
@@ -401,7 +440,8 @@
       // status update for the same taskid! This is a safety check, 
       // and is addressed better at the TaskTracker to ensure this.
       // @see {@link TaskTracker.transmitHeartbeat()}
-      if ((newState != TaskStatus.State.RUNNING) && 
+      if ((newState != TaskStatus.State.RUNNING && 
+           newState != TaskStatus.State.COMMIT_PENDING ) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
                  "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
@@ -733,6 +773,9 @@
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
+    if (cleanup) {
+      t.setCleanupTask();
+    }
     t.setConf(conf);
     t.setFailedRanges(failedRanges);
     t.setSkipping(skipping);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Sep  5 03:50:04 2008
@@ -436,7 +436,7 @@
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tracker.reportTaskFinished(t.getTaskID());
+      tracker.reportTaskFinished(t.getTaskID(), false);
     }
   }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Sep  5 03:50:04 2008
@@ -37,7 +37,7 @@
     LogFactory.getLog(TaskStatus.class.getName());
   
   //enumeration for reporting current phase of a task. 
-  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
+  public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
@@ -264,9 +264,11 @@
    * @param phase
    * @param counters
    */
-  synchronized void statusUpdate(float progress, String state, 
+  synchronized void statusUpdate(State runState, 
+                                 float progress,
+                                 String state, 
                                  Counters counters) {
-    setRunState(TaskStatus.State.RUNNING);
+    setRunState(runState);
     setProgress(progress);
     setStateString(state);
     setCounters(counters);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep  5 03:50:04 2008
@@ -28,6 +28,7 @@
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -191,6 +192,12 @@
    */  
   private int probe_sample_size = 500;
     
+  /*
+   * A list of commitTaskActions for whom commit response has been received 
+   */
+  private List<TaskAttemptID> commitResponses = 
+            Collections.synchronizedList(new ArrayList<TaskAttemptID>());
+
   private ShuffleServerMetrics shuffleServerMetrics;
   /** This class contains the methods that should be used for metrics-reporting
    * the specific metrics for shuffle. The TaskTracker is actually a server for
@@ -407,8 +414,10 @@
     // RPC initialization
     int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
                        maxCurrentMapTasks : maxCurrentReduceTasks;
+    //set the num handlers to max*2 since canCommit may wait for the duration
+    //of a heartbeat RPC
     this.taskReportServer =
-      RPC.getServer(this, bindAddress, tmpPort, max, false, this.fConf);
+      RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
     this.taskReportServer.start();
 
     // get the assigned address
@@ -957,6 +966,13 @@
           for(TaskTrackerAction action: actions) {
             if (action instanceof LaunchTaskAction) {
               startNewTask((LaunchTaskAction) action);
+            } else if (action instanceof CommitTaskAction) {
+              CommitTaskAction commitAction = (CommitTaskAction)action;
+              if (!commitResponses.contains(commitAction.getTaskID())) {
+                LOG.info("Received commit task action for " + 
+                          commitAction.getTaskID());
+                commitResponses.add(commitAction.getTaskID());
+              }
             } else {
               tasksToCleanup.put(action);
             }
@@ -1072,7 +1088,8 @@
       
     synchronized (this) {
       for (TaskStatus taskStatus : status.getTaskReports()) {
-        if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+        if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
           } else {
@@ -1188,7 +1205,8 @@
   private synchronized void markUnresponsiveTasks() throws IOException {
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
-      if (tip.getRunState() == TaskStatus.State.RUNNING) {
+      if (tip.getRunState() == TaskStatus.State.RUNNING ||
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
         long jobTaskTimeout = tip.getTaskTimeout();
@@ -1308,7 +1326,8 @@
     TaskInProgress killMe = null;
     for (Iterator it = runningTasks.values().iterator(); it.hasNext();) {
       TaskInProgress tip = (TaskInProgress) it.next();
-      if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+      if ((tip.getRunState() == TaskStatus.State.RUNNING ||
+           tip.getRunState() == TaskStatus.State.COMMIT_PENDING) &&
           !tip.wasKilled) {
                 
         if (killMe == null) {
@@ -1517,7 +1536,6 @@
     private TaskStatus taskStatus; 
     private long taskTimeout;
     private String debugCommand;
-    private boolean shouldPromoteOutput = false;
         
     /**
      */
@@ -1667,7 +1685,8 @@
           "% " + taskStatus.getStateString());
       
       if (this.done || 
-          this.taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+          (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
@@ -1717,15 +1736,8 @@
     /**
      * The task is reporting that it's done running
      */
-    public synchronized void reportDone(boolean shouldPromote) {
-      TaskStatus.State state = null;
-      this.shouldPromoteOutput = shouldPromote;
-      if (shouldPromote) {
-        state = TaskStatus.State.COMMIT_PENDING;
-      } else {
-        state = TaskStatus.State.SUCCEEDED;
-      }
-      this.taskStatus.setRunState(state);
+    public synchronized void reportDone() {
+      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -1758,13 +1770,7 @@
       //
       boolean needCleanup = false;
       synchronized (this) {
-        if (done) {
-          if (shouldPromoteOutput) {
-            taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
-          } else {
-            taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
-          }
-        } else {
+        if (!done) {
           if (!wasKilled) {
             failures += 1;
             taskStatus.setRunState(TaskStatus.State.FAILED);
@@ -1960,7 +1966,8 @@
     public void jobHasFinished(boolean wasFailure) throws IOException {
       // Kill the task if it is still running
       synchronized(this){
-        if (getRunState() == TaskStatus.State.RUNNING) {
+        if (getRunState() == TaskStatus.State.RUNNING ||
+            getRunState() == TaskStatus.State.COMMIT_PENDING) {
           kill(wasFailure);
         }
       }
@@ -1974,7 +1981,8 @@
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
-      if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
+      if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
+          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
@@ -2136,13 +2144,33 @@
   }
 
   /**
+   * Task is reporting that it is in commit_pending
+   * and it is waiting for the commit Response
+   */
+  public synchronized void commitPending(TaskAttemptID taskid,
+                                         TaskStatus taskStatus) 
+  throws IOException {
+    LOG.info("Task " + taskid + " is in COMMIT_PENDING");
+    statusUpdate(taskid, taskStatus);
+    reportTaskFinished(taskid, true);
+  }
+  
+  /**
+   * Child checking whether it can commit 
+   */
+  public synchronized boolean canCommit(TaskAttemptID taskid) {
+    return commitResponses.contains(taskid); //don't remove it now
+  }
+  
+  /**
    * The task is done.
    */
-  public synchronized void done(TaskAttemptID taskid, boolean shouldPromote) 
+  public synchronized void done(TaskAttemptID taskid) 
   throws IOException {
     TaskInProgress tip = tasks.get(taskid);
+    commitResponses.remove(taskid);
     if (tip != null) {
-      tip.reportDone(shouldPromote);
+      tip.reportDone();
     } else {
       LOG.warn("Unknown child task done: "+taskid+". Ignored.");
     }
@@ -2196,13 +2224,15 @@
   /**
    * The task is no longer running.  It may not have completed successfully
    */
-  void reportTaskFinished(TaskAttemptID taskid) {
+  void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
     TaskInProgress tip;
     synchronized (this) {
       tip = tasks.get(taskid);
     }
     if (tip != null) {
-      tip.taskFinished();
+      if (!commitPending) {
+        tip.taskFinished();
+      }
       synchronized(finishedCount) {
         finishedCount[0]++;
         finishedCount.notify();
@@ -2331,7 +2361,7 @@
       TaskStatus status = tip.getStatus();
       status.setIncludeCounters(sendCounters);
       status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
-      // send counters for finished or failed tasks.
+      // send counters for finished or failed tasks and commit pending tasks
       if (status.getRunState() != TaskStatus.State.RUNNING) {
         status.setIncludeCounters(true);
       }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Fri Sep  5 03:50:04 2008
@@ -48,7 +48,10 @@
     KILL_JOB,
     
     /** Reinitialize the tasktracker. */
-    REINIT_TRACKER
+    REINIT_TRACKER,
+
+    /** Ask a task to save its output. */
+    COMMIT_TASK
   };
   
   /**
@@ -80,6 +83,11 @@
         action = new ReinitTrackerAction();
       }
       break;
+    case COMMIT_TASK:
+      {
+        action = new CommitTaskAction();
+      }
+      break;
     }
 
     return action;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Sep  5 03:50:04 2008
@@ -207,7 +207,8 @@
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           (state == TaskStatus.State.COMMIT_PENDING))) {
         mapCount++;
       }
     }
@@ -224,7 +225,8 @@
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           (state == TaskStatus.State.COMMIT_PENDING))) {
         reduceCount++;
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Sep  5 03:50:04 2008
@@ -45,9 +45,10 @@
    * Version 9 changes the counter representation for HADOOP-1915
    * Version 10 changed the TaskStatus format and added reportNextRecordRange
    *            for HADOOP-153
+   * Version 11 Adds RPCs for task commit as part of HADOOP-3150
    * */
 
-  public static final long versionID = 10L;
+  public static final long versionID = 11L;
   
   /** Called when a child task process starts, to get its task.*/
   Task getTask(TaskAttemptID taskid) throws IOException;
@@ -88,9 +89,26 @@
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this.
    * @param taskid task's id
-   * @param shouldBePromoted whether to promote the task's output or not 
    */
-  void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+  void done(TaskAttemptID taskid) throws IOException;
+  
+  /** 
+   * Report that the task is complete, but its commit is pending.
+   * 
+   * @param taskId task's id
+   * @param taskStatus status of the child
+   * @throws IOException
+   */
+  void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) 
+  throws IOException, InterruptedException;  
+
+  /**
+   * Polling to know whether the task can go-ahead with commit 
+   * @param taskid
+   * @return true/false 
+   * @throws IOException
+   */
+  boolean canCommit(TaskAttemptID taskid) throws IOException;
 
   /** Report that a reduce-task couldn't shuffle map-outputs.*/
   void shuffleError(TaskAttemptID taskId, String message) throws IOException;

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=692408&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Sep  5 03:50:04 2008
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+public class TestFileOutputCommitter extends TestCase {
+  private static Path outDir = new Path(
+     System.getProperty("test.build.data", "."), "output");
+
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+  private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+
+  @SuppressWarnings("unchecked")
+  public void testCommitter() throws Exception {
+    JobConf job = new JobConf();
+    job.set("mapred.task.id", attempt);
+    job.setOutputCommitter(FileOutputCommitter.class);
+    JobContext jContext = new JobContext(job);
+    TaskAttemptContext tContext = new TaskAttemptContext(job, taskID);
+    FileOutputFormat.setOutputPath(job, outDir);
+    FileOutputCommitter committer = new FileOutputCommitter();
+    FileOutputFormat.setWorkOutputPath(job, 
+      committer.getTempTaskOutputPath(tContext));
+
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+    String file = "test.txt";
+
+    // A reporter that does nothing
+    Reporter reporter = Reporter.NULL;
+    FileSystem localFs = FileSystem.getLocal(job);
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter =
+      theOutputFormat.getRecordWriter(localFs, job, file, reporter);
+    Text key1 = new Text("key1");
+    Text key2 = new Text("key2");
+    Text val1 = new Text("val1");
+    Text val2 = new Text("val2");
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(reporter);
+    }
+    committer.commitTask(tContext);
+    committer.cleanupJob(jContext);
+    
+    File expectedFile = new File(new Path(outDir, file).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = UtilsForTests.slurp(expectedFile);
+    assertEquals(output, expectedOutput.toString());
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestFileOutputCommitter().testCommitter();
+  }
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Sep  5 03:50:04 2008
@@ -74,6 +74,8 @@
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
       assertEquals("number of reduces", 1, reports.length);
+      reports = client.getCleanupTaskReports(jobid);
+      assertEquals("number of cleanups", 2, reports.length);
       Counters counters = ret.job.getCounters();
       assertEquals("number of map inputs", 3, 
                    counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Fri Sep  5 03:50:04 2008
@@ -38,12 +38,14 @@
     }
   }
 
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+
   private static Path workDir = 
     new Path(new Path(
                       new Path(System.getProperty("test.build.data", "."), 
                                "data"), 
-                      MRConstants.TEMP_DIR_NAME), 
-             "TestMultipleTextOutputFormat");
+                      FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
 
   private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
     for (int i = 10; i < 40; i++) {
@@ -84,6 +86,7 @@
   
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Sep  5 03:50:04 2008
@@ -230,6 +230,13 @@
         }
       }
       rjob.killJob();
+      while(rjob.cleanupProgress() == 0.0f) {
+        try {
+          Thread.sleep(10);  
+        } catch (InterruptedException ie) {
+          break;
+        }
+      }
       if (shouldSucceed) {
         assertTrue(rjob.isComplete());
       } else {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Sep  5 03:50:04 2008
@@ -23,7 +23,6 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 
 import junit.framework.TestCase;
@@ -34,6 +33,8 @@
       LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
 
   private static final int RECORDS = 10000;
+  // A random task attempt id for testing.
+  private static final String attempt = "attempt_200707121733_0001_m_000000_0";
 
   public void testBinary() throws IOException {
     JobConf job = new JobConf();
@@ -41,8 +42,7 @@
     
     Path dir = 
       new Path(new Path(new Path(System.getProperty("test.build.data",".")), 
-                        MRConstants.TEMP_DIR_NAME),
-               "mapred");
+                        FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
     Path file = new Path(dir, "testbinary.seq");
     Random r = new Random();
     long seed = r.nextLong();
@@ -53,6 +53,7 @@
       fail("Failed to create output directory");
     }
 
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, dir);
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Fri Sep  5 03:50:04 2008
@@ -35,17 +35,19 @@
       throw new RuntimeException("init failure", e);
     }
   }
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
 
   private static Path workDir = 
     new Path(new Path(
                       new Path(System.getProperty("test.build.data", "."), 
                                "data"), 
-                      MRConstants.TEMP_DIR_NAME), 
-             "TestTextOutputFormat");
+                      FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
 
   @SuppressWarnings("unchecked")
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);
@@ -98,6 +100,7 @@
     JobConf job = new JobConf();
     String separator = "\u0001";
     job.set("mapred.textoutputformat.separator", separator);
+    job.set("mapred.task.id", attempt);
     FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);

Modified: hadoop/core/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobdetails.jsp?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobdetails.jsp Fri Sep  5 03:50:04 2008
@@ -88,6 +88,41 @@
                   ) + 
               "</td></tr>\n");
   }
+
+  private void printCleanupTaskSummary(JspWriter out,
+                                String jobId,
+                                TaskInProgress[] tasks
+                               ) throws IOException {
+    int totalTasks = tasks.length;
+    int runningTasks = 0;
+    int finishedTasks = 0;
+    int killedTasks = 0;
+    String kind = "cleanup";
+    for(int i=0; i < totalTasks; ++i) {
+      TaskInProgress task = tasks[i];
+      if (task.isComplete()) {
+        finishedTasks += 1;
+      } else if (task.isRunning()) {
+        runningTasks += 1;
+      } else if (task.isFailed()) {
+        killedTasks += 1;
+      }
+    }
+    int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks; 
+    out.print(((runningTasks > 0)  
+               ? "<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind + 
+                 "&pagenum=1" + "&state=running\">" + " Running" + 
+                 "</a>" 
+               : ((pendingTasks > 0) ? " Pending" :
+                 ((finishedTasks > 0) 
+               ?"<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind + 
+                "&pagenum=1" + "&state=completed\">" + " Successful"
+                 + "</a>" 
+               : ((killedTasks > 0) 
+               ?"<a href=\"jobtasks.jsp?jobid=" + jobId + "&type="+ kind +
+                "&pagenum=1" + "&state=killed\">" + " Failed" 
+                + "</a>" : "None")))));
+  }
   
   private void printConfirm(JspWriter out, String jobId) throws IOException{
     String url = "jobdetails.jsp?jobid=" + jobId;
@@ -194,6 +229,9 @@
             job.getFinishTime(), job.getStartTime()) + "<br>\n");
       }
     }
+    out.print("<b>Job Cleanup:</b>");
+    printCleanupTaskSummary(out, jobId, job.getCleanupTasks());
+    out.print("<br>\n");
     if (flakyTaskTrackers > 0) {
       out.print("<b>Black-listed TaskTrackers:</b> " + 
           "<a href=\"jobblacklistedtrackers.jsp?jobid=" + jobId + "\">" +

Modified: hadoop/core/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtasks.jsp?rev=692408&r1=692407&r2=692408&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtasks.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtasks.jsp Fri Sep  5 03:50:04 2008
@@ -39,9 +39,12 @@
      reports = (job != null) ? tracker.getMapTaskReports(jobidObj) : null;
      tasks = (job != null) ? job.getMapTasks() : null;
     }
-  else{
+  else if ("reduce".equals(type)) {
     reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null;
     tasks = (job != null) ? job.getReduceTasks() : null;
+  } else {
+    reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null;
+    tasks = (job != null) ? job.getCleanupTasks() : null;
   }
 %>