You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/04 01:04:54 UTC

svn commit: r1240413 [1/2] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...

Author: vinodkv
Date: Sat Feb  4 00:04:53 2012
New Revision: 1240413

URL: http://svn.apache.org/viewvc?rev=1240413&view=rev
Log:
MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task output is recovered and thus reduce the unnecessarily bloated recovery time. Contributed by Robert Joseph Evans.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Feb  4 00:04:53 2012
@@ -673,6 +673,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3727. jobtoken location property in jobconf refers to wrong 
     jobtoken file (tucu)
 
+    MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task
+    output is recovered and thus reduce the unnecessarily bloated recovery
+    time. (Robert Joseph Evans via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Feb  4 00:04:53 2012
@@ -559,6 +559,7 @@ public abstract class TaskImpl implement
   }
 
   private void internalError(TaskEventType type) {
+    LOG.error("Invalid event " + type + " on Task " + this.taskId);
     eventHandler.handle(new JobDiagnosticsUpdateEvent(
         this.taskId.getJobId(), "Invalid event " + type + 
         " on Task " + this.taskId));

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Sat Feb  4 00:04:53 2012
@@ -103,6 +103,7 @@ public class LocalContainerAllocator ext
       // This can happen when the connection to the RM has gone down. Keep
       // re-trying until the retryInterval has expired.
       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
                                          JobEventType.INTERNAL_ERROR));
         throw new YarnException("Could not contact RM after " +

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Sat Feb  4 00:04:53 2012
@@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -358,16 +360,24 @@ public class RecoveryService extends Com
           //recover the task output
           TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
               attInfo.getAttemptId());
-          try {
-            committer.recoverTask(taskContext);
+          try { 
+            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
+            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+              committer.recoverTask(taskContext);
+              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+            } else {
+              LOG.info("Will not try to recover output for "
+                  + taskContext.getTaskAttemptID());
+            }
           } catch (IOException e) {
+            LOG.error("Caught an exception while trying to recover task "+aId, e);
             actualHandler.handle(new JobDiagnosticsUpdateEvent(
                 aId.getTaskId().getJobId(), "Error in recovering task output " + 
                 e.getMessage()));
             actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
                 JobEventType.INTERNAL_ERROR));
           }
-          LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
           
           // send the done event
           LOG.info("Sending done event to " + aId);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Feb  4 00:04:53 2012
@@ -543,6 +543,7 @@ public class RMContainerAllocator extend
       // This can happen when the connection to the RM has gone down. Keep
       // re-trying until the retryInterval has expired.
       if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
         eventHandler.handle(new JobEvent(this.getJob().getID(),
                                          JobEventType.INTERNAL_ERROR));
         throw new YarnException("Could not contact RM after " +

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Sat Feb  4 00:04:53 2012
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestRecovery {
 
   private static final Log LOG = LogFactory.getLog(TestRecovery.class);
@@ -112,7 +113,7 @@ public class TestRecovery {
     Assert.assertEquals("Reduce Task state not correct",
         TaskState.RUNNING, reduceTask.getReport().getTaskState());
     
-  //send the fail signal to the 1st map task attempt
+    //send the fail signal to the 1st map task attempt
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
             task1Attempt1.getID(),
@@ -193,7 +194,7 @@ public class TestRecovery {
     //RUNNING state
     app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
     
-  //send the done signal to the 2nd map task
+    //send the done signal to the 2nd map task
     app.getContext().getEventHandler().handle(
         new TaskAttemptEvent(
             mapTask2.getAttempts().values().iterator().next().getID(),
@@ -349,6 +350,151 @@ public class TestRecovery {
     validateOutput();
   }
 
+  @Test
+  public void testOutputRecoveryMapsOnly() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask1 = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+        .next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+  
+    // write output corresponding to map1 (This is just to validate that it is
+    //no included in the output)
+    writeBadOutput(task1Attempt1, conf);
+    
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+    //stop the app before the job completes.
+    app.stop();
+    
+    //rerun
+    //in rerun the map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask1 = it.next();
+    
+    // map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port after recovery
+    task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+    Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+    
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+    .next();
+
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task2Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for map task to complete
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    // Verify the shuffle-port
+    Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+    
+    app.waitForState(reduceTask1, TaskState.RUNNING);
+    TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+    
+    // write output corresponding to reduce1
+    writeOutput(reduce1Attempt1, conf);
+    
+    //send the done signal to the 1st reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduce1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait for first reduce task to complete
+    app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    validateOutput();
+  }
+  
+  private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+  throws Exception {
+  TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
+      TypeConverter.fromYarn(attempt.getID()));
+  
+  TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+  RecordWriter theRecordWriter = theOutputFormat
+      .getRecordWriter(tContext);
+  
+  NullWritable nullWritable = NullWritable.get();
+  try {
+    theRecordWriter.write(key2, val2);
+    theRecordWriter.write(null, nullWritable);
+    theRecordWriter.write(null, val2);
+    theRecordWriter.write(nullWritable, val1);
+    theRecordWriter.write(key1, nullWritable);
+    theRecordWriter.write(key2, null);
+    theRecordWriter.write(null, null);
+    theRecordWriter.write(key1, val1);
+  } finally {
+    theRecordWriter.close(tContext);
+  }
+  
+  OutputFormat outputFormat = ReflectionUtils.newInstance(
+      tContext.getOutputFormatClass(), conf);
+  OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+  committer.commitTask(tContext);
+}
+  
+  
   private void writeOutput(TaskAttempt attempt, Configuration conf)
     throws Exception {
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Sat Feb  4 00:04:53 2012
@@ -19,14 +19,12 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /** An {@link OutputCommitter} that commits files specified 
@@ -42,280 +40,140 @@ public class FileOutputCommitter extends
   /**
    * Temporary directory name 
    */
-  public static final String TEMP_DIR_NAME = "_temporary";
-  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
-    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
-
-  public void setupJob(JobContext context) throws IOException {
+  public static final String TEMP_DIR_NAME = 
+    org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
+  public static final String SUCCEEDED_FILE_NAME = 
+    org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME;
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+    org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
+  
+  private static Path getOutputPath(JobContext context) {
+    JobConf conf = context.getJobConf();
+    return FileOutputFormat.getOutputPath(conf);
+  }
+  
+  private static Path getOutputPath(TaskAttemptContext context) {
     JobConf conf = context.getJobConf();
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      Path tmpDir = 
-          new Path(outputPath, getJobAttemptBaseDirName(context) + 
-              Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(conf);
-      if (!fileSys.mkdirs(tmpDir)) {
-        LOG.error("Mkdirs failed to create " + tmpDir.toString());
-      }
+    return FileOutputFormat.getOutputPath(conf);
+  }
+  
+  private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null;
+  
+  private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
+  getWrapped(JobContext context) throws IOException {
+    if(wrapped == null) {
+      wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
+          getOutputPath(context), context);
     }
+    return wrapped;
   }
-
-  // True if the job requires output.dir marked on successful job.
-  // Note that by default it is set to true.
-  private boolean shouldMarkOutputDir(JobConf conf) {
-    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+  
+  private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
+  getWrapped(TaskAttemptContext context) throws IOException {
+    if(wrapped == null) {
+      wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
+          getOutputPath(context), context);
+    }
+    return wrapped;
   }
   
-  public void commitJob(JobContext context) throws IOException {
-    //delete the task temp directory from the current jobtempdir
-    JobConf conf = context.getJobConf();
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      FileSystem outputFileSystem = outputPath.getFileSystem(conf);
-      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
-          Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (fileSys.exists(tmpDir)) {
-        fileSys.delete(tmpDir, true);
-      } else {
-        LOG.warn("Task temp dir could not be deleted " + tmpDir);
-      }
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param context the context of the job.  This is used to get the
+   * application attempt id.
+   * @return the path to store job attempt data.
+   */
+  @Private
+  Path getJobAttemptPath(JobContext context) {
+    return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+        .getJobAttemptPath(context, getOutputPath(context));
+  }
 
-      //move the job output to final place
-      Path jobOutputPath = 
-          new Path(outputPath, getJobAttemptBaseDirName(context));
-      moveJobOutputs(outputFileSystem, 
-          jobOutputPath, outputPath, jobOutputPath);
+  @Private
+  Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
+    return getTaskAttemptPath(context, getOutputPath(context));
+  }
 
-      // delete the _temporary folder in the output folder
-      cleanupJob(context);
-      // check if the output-dir marking is required
-      if (shouldMarkOutputDir(context.getJobConf())) {
-        // create a _success file in the output folder
-        markOutputDirSuccessful(context);
-      }
+  private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
+    Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
+    if(workPath == null) {
+      return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+      .getTaskAttemptPath(context, out);
     }
+    return workPath;
   }
   
-  // Create a _success file in the job's output folder
-  private void markOutputDirSuccessful(JobContext context) throws IOException {
-    JobConf conf = context.getJobConf();
-    // get the o/p path
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      // get the filesys
-      FileSystem fileSys = outputPath.getFileSystem(conf);
-      // create a file in the output folder to mark the job completion
-      Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-      fileSys.create(filePath).close();
-    }
+  /**
+   * Compute the path where the output of a committed task is stored until
+   * the entire job is committed.
+   * @param context the context of the task attempt
+   * @return the path where the output of a committed task is stored until
+   * the entire job is committed.
+   */
+  Path getCommittedTaskPath(TaskAttemptContext context) {
+    return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+        .getCommittedTaskPath(context, getOutputPath(context));
   }
 
-  private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
-      Path finalOutputDir, Path jobOutput) throws IOException {
-    LOG.debug("Told to move job output from " + jobOutput
-        + " to " + finalOutputDir + 
-        " and orig job output path is " + origJobOutputPath);  
-    if (fs.isFile(jobOutput)) {
-      Path finalOutputPath = 
-          getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
-      if (!fs.rename(jobOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of job");
-        }
-        if (!fs.rename(jobOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of job");
-        }
-      }
-      LOG.debug("Moved job output file from " + jobOutput + " to " + 
-          finalOutputPath);
-    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
-      LOG.debug("Job output file " + jobOutput + " is a dir");      
-      FileStatus[] paths = fs.listStatus(jobOutput);
-      Path finalOutputPath = 
-          getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
-      fs.mkdirs(finalOutputPath);
-      LOG.debug("Creating dirs along job output path " + finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
-        }
-      }
-    }
+  public Path getWorkPath(TaskAttemptContext context, Path outputPath) 
+  throws IOException {
+    return getTaskAttemptPath(context, outputPath);
+  }
+  
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    getWrapped(context).setupJob(context);
+  }
+  
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    getWrapped(context).commitJob(context);
   }
   
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
-    JobConf conf = context.getJobConf();
-    // do the clean up of temporary directory
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(conf);
-      context.getProgressible().progress();
-      if (fileSys.exists(tmpDir)) {
-        fileSys.delete(tmpDir, true);
-      } else {
-        LOG.warn("Output Path is Null in cleanup");
-      }
-    }
+    getWrapped(context).cleanupJob(context);
   }
 
   @Override
   public void abortJob(JobContext context, int runState) 
   throws IOException {
-    // simply delete the _temporary dir from the o/p folder of the job
-    cleanupJob(context);
+    JobStatus.State state;
+    if(runState == JobStatus.State.RUNNING.getValue()) {
+      state = JobStatus.State.RUNNING;
+    } else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
+      state = JobStatus.State.SUCCEEDED;
+    } else if(runState == JobStatus.State.FAILED.getValue()) {
+      state = JobStatus.State.FAILED;
+    } else if(runState == JobStatus.State.PREP.getValue()) {
+      state = JobStatus.State.PREP;
+    } else if(runState == JobStatus.State.KILLED.getValue()) {
+      state = JobStatus.State.KILLED;
+    } else {
+      throw new IllegalArgumentException(runState+" is not a valid runState.");
+    }
+    getWrapped(context).abortJob(context, state);
   }
   
   public void setupTask(TaskAttemptContext context) throws IOException {
-    // FileOutputCommitter's setupTask doesn't do anything. Because the
-    // temporary task directory is created on demand when the 
-    // task is writing.
+    getWrapped(context).setupTask(context);
   }
-		  
-  public void commitTask(TaskAttemptContext context) 
-  throws IOException {
-    Path taskOutputPath = getTempTaskOutputPath(context);
-    TaskAttemptID attemptId = context.getTaskAttemptID();
-    JobConf job = context.getJobConf();
-    if (taskOutputPath != null) {
-      FileSystem fs = taskOutputPath.getFileSystem(job);
-      context.getProgressible().progress();
-      if (fs.exists(taskOutputPath)) {
-        // Move the task outputs to the current job attempt output dir
-        JobConf conf = context.getJobConf();
-        Path outputPath = FileOutputFormat.getOutputPath(conf);
-        FileSystem outputFileSystem = outputPath.getFileSystem(conf);
-        Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
-        moveTaskOutputs(context, outputFileSystem, jobOutputPath, 
-            taskOutputPath);
-
-        // Delete the temporary task-specific output directory
-        if (!fs.delete(taskOutputPath, true)) {
-          LOG.info("Failed to delete the temporary output" + 
-          " directory of task: " + attemptId + " - " + taskOutputPath);
-        }
-        LOG.info("Saved output of task '" + attemptId + "' to " + 
-                 jobOutputPath);
-      }
-    }
-  }
-		  
-  private void moveTaskOutputs(TaskAttemptContext context,
-                               FileSystem fs,
-                               Path jobOutputDir,
-                               Path taskOutput) 
-  throws IOException {
-    TaskAttemptID attemptId = context.getTaskAttemptID();
-    context.getProgressible().progress();
-    LOG.debug("Told to move taskoutput from " + taskOutput
-        + " to " + jobOutputDir);    
-    if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, 
-                                          getTempTaskOutputPath(context));
-      if (!fs.rename(taskOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of task: " + 
-                                 attemptId);
-        }
-        if (!fs.rename(taskOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of task: " + 
-        		  attemptId);
-        }
-      }
-      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-    } else if(fs.getFileStatus(taskOutput).isDirectory()) {
-      LOG.debug("Taskoutput " + taskOutput + " is a dir");
-      FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, 
-	          getTempTaskOutputPath(context));
-      fs.mkdirs(finalOutputPath);
-      LOG.debug("Creating dirs along path " + finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
-        }
-      }
-    }
+  
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    getWrapped(context).commitTask(context, getTaskAttemptPath(context));
   }
 
+  @Override
   public void abortTask(TaskAttemptContext context) throws IOException {
-    Path taskOutputPath =  getTempTaskOutputPath(context);
-    if (taskOutputPath != null) {
-      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-      context.getProgressible().progress();
-      fs.delete(taskOutputPath, true);
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput, 
-                            Path taskOutputPath) throws IOException {
-    URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
-    URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
-    URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
-    if (taskOutputUri == relativePath) { 
-      //taskOutputPath is not a parent of taskOutput
-      throw new IOException("Can not get the relative path: base = " + 
-          taskOutputPathUri + " child = " + taskOutputUri);
-    }
-    if (relativePath.getPath().length() > 0) {
-      return new Path(jobOutputDir, relativePath.getPath());
-    } else {
-      return jobOutputDir;
-    }
+    getWrapped(context).abortTask(context, getTaskAttemptPath(context));
   }
 
+  @Override
   public boolean needsTaskCommit(TaskAttemptContext context) 
   throws IOException {
-    Path taskOutputPath = getTempTaskOutputPath(context);
-    if (taskOutputPath != null) {
-      context.getProgressible().progress();
-      // Get the file-system for the task output directory
-      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-      // since task output path is created on demand, 
-      // if it exists, task needs a commit
-      if (fs.exists(taskOutputPath)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  Path getTempTaskOutputPath(TaskAttemptContext taskContext) 
-      throws IOException {
-    JobConf conf = taskContext.getJobConf();
-    Path outputPath = FileOutputFormat.getOutputPath(conf);
-    if (outputPath != null) {
-      Path p = new Path(outputPath,
-                     (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-                      "_" + taskContext.getTaskAttemptID().toString()));
-      FileSystem fs = p.getFileSystem(conf);
-      return p.makeQualified(fs);
-    }
-    return null;
-  }
-  
-  Path getWorkPath(TaskAttemptContext taskContext, Path basePath) 
-  throws IOException {
-    // ${mapred.out.dir}/_temporary
-    Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
-    FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
-    if (!fs.exists(jobTmpDir)) {
-      throw new IOException("The temporary job-output directory " + 
-          jobTmpDir.toString() + " doesn't exist!"); 
-    }
-    // ${mapred.out.dir}/_temporary/_${taskid}
-    String taskid = taskContext.getTaskAttemptID().toString();
-    Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
-    if (!fs.mkdirs(taskTmpDir)) {
-      throw new IOException("Mkdirs failed to create " 
-          + taskTmpDir.toString());
-    }
-    return taskTmpDir;
+    return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
   }
   
   @Override
@@ -326,54 +184,6 @@ public class FileOutputCommitter extends
   @Override
   public void recoverTask(TaskAttemptContext context)
       throws IOException {
-    Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
-    context.progress();
-    Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
-    int previousAttempt =         
-        context.getConfiguration().getInt(
-            MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
-    if (previousAttempt < 0) {
-      LOG.warn("Cannot recover task output for first attempt...");
-      return;
-    }
-
-    FileSystem outputFileSystem = 
-        outputPath.getFileSystem(context.getJobConf());
-    Path pathToRecover = 
-        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
-    if (outputFileSystem.exists(pathToRecover)) {
-      // Move the task outputs to their final place
-      LOG.debug("Trying to recover task from " + pathToRecover
-          + " into " + jobOutputPath);
-      moveJobOutputs(outputFileSystem, 
-          pathToRecover, jobOutputPath, pathToRecover);
-      LOG.info("Saved output of job to " + jobOutputPath);
-    }
-  }
-
-  protected static String getJobAttemptBaseDirName(JobContext context) {
-    int appAttemptId = 
-        context.getJobConf().getInt(
-            MRConstants.APPLICATION_ATTEMPT_ID, 0);
-    return getJobAttemptBaseDirName(appAttemptId);
-  }
-
-  protected static String getJobTempDirName(TaskAttemptContext context) {
-    int appAttemptId = 
-        context.getJobConf().getInt(
-            MRConstants.APPLICATION_ATTEMPT_ID, 0);
-    return getJobAttemptBaseDirName(appAttemptId);
-  }
-
-  protected static String getJobAttemptBaseDirName(int appAttemptId) {
-    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
-      + appAttemptId;
-  }
-
-  protected static String getTaskAttemptBaseDirName(
-      TaskAttemptContext context) {
-    return getJobTempDirName(context) + Path.SEPARATOR + 
-      FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-      "_" + context.getTaskAttemptID().toString();
+    getWrapped(context).recoverTask(context);
   }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sat Feb  4 00:04:53 2012
@@ -525,7 +525,7 @@ abstract public class Task implements Wr
     if (outputPath != null) {
       if ((committer instanceof FileOutputCommitter)) {
         FileOutputFormat.setWorkOutputPath(conf, 
-          ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext));
+          ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
       } else {
         FileOutputFormat.setWorkOutputPath(conf, outputPath);
       }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Sat Feb  4 00:04:53 2012
@@ -51,17 +51,21 @@ import org.apache.hadoop.classification.
  *   Discard the task commit.
  *   </li>
  * </ol>
+ * The methods in this class can be called from several different processes and
+ * from several different contexts.  It is important to know which process and
+ * which context each is called from.  Each method should be marked accordingly
+ * in its documentation.
  * 
  * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
  * @see JobContext
  * @see TaskAttemptContext 
- *
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public abstract class OutputCommitter {
   /**
-   * For the framework to setup the job output during initialization
+   * For the framework to setup the job output during initialization.  This is
+   * called from the application master process for the entire job.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException if temporary output could not be created
@@ -69,11 +73,12 @@ public abstract class OutputCommitter {
   public abstract void setupJob(JobContext jobContext) throws IOException;
 
   /**
-   * For cleaning up the job's output after job completion
+   * For cleaning up the job's output after job completion.  This is called
+   * from the application master process for the entire job.
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
-   * @deprecated Use {@link #commitJob(JobContext)} or
+   * @deprecated Use {@link #commitJob(JobContext)} and
    *                 {@link #abortJob(JobContext, JobStatus.State)} instead.
    */
   @Deprecated
@@ -81,7 +86,8 @@ public abstract class OutputCommitter {
 
   /**
    * For committing job's output after successful job completion. Note that this
-   * is invoked for jobs with final runstate as SUCCESSFUL.	
+   * is invoked for jobs with final runstate as SUCCESSFUL.  This is called
+   * from the application master process for the entire job.	
    * 
    * @param jobContext Context of the job whose output is being written.
    * @throws IOException
@@ -94,7 +100,8 @@ public abstract class OutputCommitter {
   /**
    * For aborting an unsuccessful job's output. Note that this is invoked for 
    * jobs with final runstate as {@link JobStatus.State#FAILED} or 
-   * {@link JobStatus.State#KILLED}.
+   * {@link JobStatus.State#KILLED}.  This is called from the application
+   * master process for the entire job.
    *
    * @param jobContext Context of the job whose output is being written.
    * @param state final runstate of the job
@@ -106,7 +113,8 @@ public abstract class OutputCommitter {
   }
   
   /**
-   * Sets up output for the task.
+   * Sets up output for the task.  This is called from each individual task's
+   * process that will output to HDFS, and it is called just for that task.
    * 
    * @param taskContext Context of the task whose output is being written.
    * @throws IOException
@@ -115,7 +123,9 @@ public abstract class OutputCommitter {
   throws IOException;
   
   /**
-   * Check whether task needs a commit
+   * Check whether task needs a commit.  This is called from each individual
+   * task's process that will output to HDFS, and it is called just for that
+   * task.
    * 
    * @param taskContext
    * @return true/false
@@ -125,18 +135,23 @@ public abstract class OutputCommitter {
   throws IOException;
 
   /**
-   * To promote the task's temporary output to final output location
-   * 
-   * The task's output is moved to the job's output directory.
+   * To promote the task's temporary output to final output location.
+   * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
+   * task is the task that the AM determines finished first, this method
+   * is called to commit an individual task's output.  This is to mark
+   * that tasks output as complete, as {@link #commitJob(JobContext)} will 
+   * also be called later on if the entire job finished successfully. This
+   * is called from a task's process.
    * 
    * @param taskContext Context of the task whose output is being written.
-   * @throws IOException if commit is not 
+   * @throws IOException if commit is not successful. 
    */
   public abstract void commitTask(TaskAttemptContext taskContext)
   throws IOException;
   
   /**
-   * Discard the task output
+   * Discard the task output. This is called from a task's process to clean 
+   * up a single task's output that can not yet been committed.
    * 
    * @param taskContext
    * @throws IOException
@@ -164,7 +179,8 @@ public abstract class OutputCommitter {
    * The retry-count for the job will be passed via the 
    * {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in  
    * {@link TaskAttemptContext#getConfiguration()} for the 
-   * <code>OutputCommitter</code>.
+   * <code>OutputCommitter</code>.  This is called from the application master
+   * process, but it is called individually for each task.
    * 
    * If an exception is thrown the task will be attempted again. 
    * 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Sat Feb  4 00:04:53 2012
@@ -19,16 +19,16 @@
 package org.apache.hadoop.mapreduce.lib.output;
 
 import java.io.IOException;
-import java.net.URI;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 /** An {@link OutputCommitter} that commits files specified 
- * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 
+ * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
  **/
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FileOutputCommitter extends OutputCommitter {
-
   private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
 
-  /**
-   * Temporary directory name 
+  /** 
+   * Name of directory where pending data is placed.  Data that has not been
+   * committed yet.
    */
-  protected static final String TEMP_DIR_NAME = "_temporary";
+  public static final String PENDING_DIR_NAME = "_temporary";
   public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
+  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
     "mapreduce.fileoutputcommitter.marksuccessfuljobs";
-  private FileSystem outputFileSystem = null;
   private Path outputPath = null;
   private Path workPath = null;
 
   /**
    * Create a file output committer
-   * @param outputPath the job's output path
+   * @param outputPath the job's output path, or null if you want the output
+   * committer to act as a noop.
    * @param context the task's context
    * @throws IOException
    */
   public FileOutputCommitter(Path outputPath, 
                              TaskAttemptContext context) throws IOException {
+    this(outputPath, (JobContext)context);
+    if (outputPath != null) {
+      workPath = getTaskAttemptPath(context, outputPath);
+    }
+  }
+  
+  /**
+   * Create a file output committer
+   * @param outputPath the job's output path, or null if you want the output
+   * committer to act as a noop.
+   * @param context the task's context
+   * @throws IOException
+   */
+  @Private
+  public FileOutputCommitter(Path outputPath, 
+                             JobContext context) throws IOException {
     if (outputPath != null) {
-      this.outputPath = outputPath;
-      outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
-      workPath = new Path(outputPath,
-                          getTaskAttemptBaseDirName(context))
-                          .makeQualified(outputFileSystem);
+      FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+      this.outputPath = fs.makeQualified(outputPath);
     }
   }
+  
+  /**
+   * @return the path where final output of the job should be placed.  This
+   * could also be considered the committed application attempt path.
+   */
+  private Path getOutputPath() {
+    return this.outputPath;
+  }
+  
+  /**
+   * @return true if we have an output path set, else false.
+   */
+  private boolean hasOutputPath() {
+    return this.outputPath != null;
+  }
+  
+  /**
+   * @return the path where the output of pending job attempts are
+   * stored.
+   */
+  private Path getPendingJobAttemptsPath() {
+    return getPendingJobAttemptsPath(getOutputPath());
+  }
+  
+  /**
+   * Get the location of pending job attempts.
+   * @param out the base output directory.
+   * @return the location of pending job attempts.
+   */
+  private static Path getPendingJobAttemptsPath(Path out) {
+    return new Path(out, PENDING_DIR_NAME);
+  }
+  
+  /**
+   * Get the Application Attempt Id for this job
+   * @param context the context to look in
+   * @return the Application Attempt Id for a given job.
+   */
+  private static int getAppAttemptId(JobContext context) {
+    return context.getConfiguration().getInt(
+        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param context the context of the job.  This is used to get the
+   * application attempt id.
+   * @return the path to store job attempt data.
+   */
+  public Path getJobAttemptPath(JobContext context) {
+    return getJobAttemptPath(context, getOutputPath());
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param context the context of the job.  This is used to get the
+   * application attempt id.
+   * @param out the output path to place these in.
+   * @return the path to store job attempt data.
+   */
+  public static Path getJobAttemptPath(JobContext context, Path out) {
+    return getJobAttemptPath(getAppAttemptId(context), out);
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  private Path getJobAttemptPath(int appAttemptId) {
+    return getJobAttemptPath(appAttemptId, getOutputPath());
+  }
+  
+  /**
+   * Compute the path where the output of a given job attempt will be placed. 
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  private static Path getJobAttemptPath(int appAttemptId, Path out) {
+    return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
+  }
+  
+  /**
+   * Compute the path where the output of pending task attempts are stored.
+   * @param context the context of the job with pending tasks. 
+   * @return the path where the output of pending task attempts are stored.
+   */
+  private Path getPendingTaskAttemptsPath(JobContext context) {
+    return getPendingTaskAttemptsPath(context, getOutputPath());
+  }
+  
+  /**
+   * Compute the path where the output of pending task attempts are stored.
+   * @param context the context of the job with pending tasks. 
+   * @return the path where the output of pending task attempts are stored.
+   */
+  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
+    return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
+  }
+  
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * 
+   * @param context the context of the task attempt.
+   * @return the path where a task attempt should be stored.
+   */
+  public Path getTaskAttemptPath(TaskAttemptContext context) {
+    return new Path(getPendingTaskAttemptsPath(context), 
+        String.valueOf(context.getTaskAttemptID()));
+  }
+  
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * 
+   * @param context the context of the task attempt.
+   * @param out The output path to put things in.
+   * @return the path where a task attempt should be stored.
+   */
+  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
+    return new Path(getPendingTaskAttemptsPath(context, out), 
+        String.valueOf(context.getTaskAttemptID()));
+  }
+  
+  /**
+   * Compute the path where the output of a committed task is stored until
+   * the entire job is committed.
+   * @param context the context of the task attempt
+   * @return the path where the output of a committed task is stored until
+   * the entire job is committed.
+   */
+  public Path getCommittedTaskPath(TaskAttemptContext context) {
+    return getCommittedTaskPath(getAppAttemptId(context), context);
+  }
+  
+  public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
+    return getCommittedTaskPath(getAppAttemptId(context), context, out);
+  }
+  
+  /**
+   * Compute the path where the output of a committed task is stored until the
+   * entire job is committed for a specific application attempt.
+   * @param appAttemptId the id of the application attempt to use
+   * @param context the context of any task.
+   * @return the path where the output of a committed task is stored.
+   */
+  private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
+    return new Path(getJobAttemptPath(appAttemptId),
+        String.valueOf(context.getTaskAttemptID().getTaskID()));
+  }
+  
+  private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
+    return new Path(getJobAttemptPath(appAttemptId, out),
+        String.valueOf(context.getTaskAttemptID().getTaskID()));
+  }
+  
+  private static class CommittedTaskFilter implements PathFilter {
+    @Override
+    public boolean accept(Path path) {
+      return !PENDING_DIR_NAME.equals(path.getName());
+    }
+  }
+  
+  /**
+   * Get a list of all paths where output from committed tasks are stored.
+   * @param context the context of the current job
+   * @return the list of these Paths/FileStatuses. 
+   * @throws IOException
+   */
+  private FileStatus[] getAllCommittedTaskPaths(JobContext context) 
+    throws IOException {
+    Path jobAttemptPath = getJobAttemptPath(context);
+    FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
+    return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
+  }
+  
+  /**
+   * Get the directory that the task should write results into.
+   * @return the work directory
+   * @throws IOException
+   */
+  public Path getWorkPath() throws IOException {
+    return workPath;
+  }
 
   /**
    * Create the temporary directory that is the root of all of the task 
@@ -79,116 +277,103 @@ public class FileOutputCommitter extends
    * @param context the job's context
    */
   public void setupJob(JobContext context) throws IOException {
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + 
-    		  Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (!fileSys.mkdirs(tmpDir)) {
-        LOG.error("Mkdirs failed to create " + tmpDir.toString());
+    if (hasOutputPath()) {
+      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+      FileSystem fs = pendingJobAttemptsPath.getFileSystem(
+          context.getConfiguration());
+      if (!fs.mkdirs(pendingJobAttemptsPath)) {
+        LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
       }
-    }
-  }
-
-  // True if the job requires output.dir marked on successful job.
-  // Note that by default it is set to true.
-  private boolean shouldMarkOutputDir(Configuration conf) {
-    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
-  }
-  
-  // Create a _success file in the job's output dir
-  private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
-    if (outputPath != null) {
-      // create a file in the output folder to mark the job completion
-      Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-      outputFileSystem.create(filePath).close();
+    } else {
+      LOG.warn("Output Path is null in setupJob()");
     }
   }
   
   /**
-   * Move all job output to the final place.
+   * The job has completed so move all committed tasks to the final output dir.
    * Delete the temporary directory, including all of the work directories.
    * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
   public void commitJob(JobContext context) throws IOException {
-    if (outputPath != null) {
-      //delete the task temp directory from the current jobtempdir
-      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
-          Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (fileSys.exists(tmpDir)) {
-        fileSys.delete(tmpDir, true);
-      } else {
-        LOG.warn("Task temp dir could not be deleted " + tmpDir);
+    if (hasOutputPath()) {
+      Path finalOutput = getOutputPath();
+      FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
+      for(FileStatus stat: getAllCommittedTaskPaths(context)) {
+        mergePaths(fs, stat, finalOutput);
       }
 
-      //move the job output to final place
-      Path jobOutputPath = 
-          new Path(outputPath, getJobAttemptBaseDirName(context));
-      moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
-
       // delete the _temporary folder and create a _done file in the o/p folder
       cleanupJob(context);
-      if (shouldMarkOutputDir(context.getConfiguration())) {
-        markOutputDirSuccessful(context);
+      // True if the job requires output.dir marked on successful job.
+      // Note that by default it is set to true.
+      if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
+        Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+        fs.create(markerPath).close();
       }
+    } else {
+      LOG.warn("Output Path is null in commitJob()");
     }
   }
 
   /**
-   * Move job output to final location 
-   * @param fs Filesystem handle
-   * @param origJobOutputPath The original location of the job output
-   * Required to generate the relative path for correct moving of data. 
-   * @param finalOutputDir The final output directory to which the job output 
-   *                       needs to be moved
-   * @param jobOutput The current job output directory being moved 
-   * @throws IOException
-   */
-  private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, 
-      Path finalOutputDir, Path jobOutput) throws IOException {
-    LOG.debug("Told to move job output from " + jobOutput
-        + " to " + finalOutputDir + 
-        " and orig job output path is " + origJobOutputPath);    
-    if (fs.isFile(jobOutput)) {
-      Path finalOutputPath = 
-          getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
-      if (!fs.rename(jobOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of job");
-        }
-        if (!fs.rename(jobOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of job");
-        }
-      }
-      LOG.debug("Moved job output file from " + jobOutput + " to " + 
-          finalOutputPath);
-    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
-      LOG.debug("Job output file " + jobOutput + " is a dir");
-      FileStatus[] paths = fs.listStatus(jobOutput);
-      Path finalOutputPath = 
-          getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
-      fs.mkdirs(finalOutputPath);
-      LOG.debug("Creating dirs along job output path " + finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
-        }
-      }
-    }
+   * Merge two paths together.  Anything in from will be moved into to, if there
+   * are any name conflicts while merging the files or directories in from win.
+   * @param fs the File System to use
+   * @param from the path data is coming from.
+   * @param to the path data is going to.
+   * @throws IOException on any error
+   */
+  private static void mergePaths(FileSystem fs, final FileStatus from,
+      final Path to)
+    throws IOException {
+     LOG.debug("Merging data from "+from+" to "+to);
+     if(from.isFile()) {
+       if(fs.exists(to)) {
+         if(!fs.delete(to, true)) {
+           throw new IOException("Failed to delete "+to);
+         }
+       }
+
+       if(!fs.rename(from.getPath(), to)) {
+         throw new IOException("Failed to rename "+from+" to "+to);
+       }
+     } else if(from.isDirectory()) {
+       if(fs.exists(to)) {
+         FileStatus toStat = fs.getFileStatus(to);
+         if(!toStat.isDirectory()) {
+           if(!fs.delete(to, true)) {
+             throw new IOException("Failed to delete "+to);
+           }
+           if(!fs.rename(from.getPath(), to)) {
+             throw new IOException("Failed to rename "+from+" to "+to);
+           }
+         } else {
+           //It is a directory so merge everything in the directories
+           for(FileStatus subFrom: fs.listStatus(from.getPath())) {
+             Path subTo = new Path(to, subFrom.getPath().getName());
+             mergePaths(fs, subFrom, subTo);
+           }
+         }
+       } else {
+         //it does not exist just rename
+         if(!fs.rename(from.getPath(), to)) {
+           throw new IOException("Failed to rename "+from+" to "+to);
+         }
+       }
+     }
   }
 
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
-    if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-      if (fileSys.exists(tmpDir)) {
-        fileSys.delete(tmpDir, true);
-      }
+    if (hasOutputPath()) {
+      Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+      FileSystem fs = pendingJobAttemptsPath
+          .getFileSystem(context.getConfiguration());
+      fs.delete(pendingJobAttemptsPath, true);
     } else {
-      LOG.warn("Output Path is null in cleanup");
+      LOG.warn("Output Path is null in cleanupJob()");
     }
   }
 
@@ -217,69 +402,40 @@ public class FileOutputCommitter extends
    * Move the files from the work directory to the job output directory
    * @param context the task context
    */
+  @Override
   public void commitTask(TaskAttemptContext context) 
   throws IOException {
-    TaskAttemptID attemptId = context.getTaskAttemptID();
-    if (workPath != null) {
-      context.progress();
-      if (outputFileSystem.exists(workPath)) {
-        // Move the task outputs to the current job attempt output dir
-    	  Path jobOutputPath = 
-    	      new Path(outputPath, getJobAttemptBaseDirName(context));
-        moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
-        // Delete the temporary task-specific output directory
-        if (!outputFileSystem.delete(workPath, true)) {
-          LOG.warn("Failed to delete the temporary output" + 
-          " directory of task: " + attemptId + " - " + workPath);
-        }
-        LOG.info("Saved output of task '" + attemptId + "' to " + 
-            jobOutputPath);
-      }
-    }
+    commitTask(context, null);
   }
 
-  /**
-   * Move all of the files from the work directory to the final output
-   * @param context the task context
-   * @param fs the output file system
-   * @param jobOutputDir the final output direcotry
-   * @param taskOutput the work path
-   * @throws IOException
-   */
-  private void moveTaskOutputs(TaskAttemptContext context,
-                               FileSystem fs,
-                               Path jobOutputDir,
-                               Path taskOutput) 
+  @Private
+  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
   throws IOException {
     TaskAttemptID attemptId = context.getTaskAttemptID();
-    context.progress();
-    LOG.debug("Told to move taskoutput from " + taskOutput
-        + " to " + jobOutputDir);    
-    if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
-                                          workPath);
-      if (!fs.rename(taskOutput, finalOutputPath)) {
-        if (!fs.delete(finalOutputPath, true)) {
-          throw new IOException("Failed to delete earlier output of task: " + 
-                                 attemptId);
-        }
-        if (!fs.rename(taskOutput, finalOutputPath)) {
-          throw new IOException("Failed to save output of task: " + 
-        		  attemptId);
-        }
+    if (hasOutputPath()) {
+      context.progress();
+      if(taskAttemptPath == null) {
+        taskAttemptPath = getTaskAttemptPath(context);
       }
-      LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-    } else if(fs.getFileStatus(taskOutput).isDirectory()) {
-      LOG.debug("Taskoutput " + taskOutput + " is a dir");
-      FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
-      fs.mkdirs(finalOutputPath);
-      LOG.debug("Creating dirs along path " + finalOutputPath);
-      if (paths != null) {
-        for (FileStatus path : paths) {
-          moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+      Path committedTaskPath = getCommittedTaskPath(context);
+      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+      if (fs.exists(taskAttemptPath)) {
+        if(fs.exists(committedTaskPath)) {
+          if(!fs.delete(committedTaskPath, true)) {
+            throw new IOException("Could not delete " + committedTaskPath);
+          }
+        }
+        if(!fs.rename(taskAttemptPath, committedTaskPath)) {
+          throw new IOException("Could not rename " + taskAttemptPath + " to "
+              + committedTaskPath);
         }
+        LOG.info("Saved output of task '" + attemptId + "' to " + 
+            committedTaskPath);
+      } else {
+        LOG.warn("No Output found for " + attemptId);
       }
+    } else {
+      LOG.warn("Output Path is null in commitTask()");
     }
   }
 
@@ -289,38 +445,22 @@ public class FileOutputCommitter extends
    */
   @Override
   public void abortTask(TaskAttemptContext context) throws IOException {
-    if (workPath != null) { 
-      context.progress();
-      outputFileSystem.delete(workPath, true);
-    }
+    abortTask(context, null);
   }
 
-  /**
-   * Find the final name of a given output file, given the job output directory
-   * and the work directory.
-   * @param jobOutputDir the job's output directory
-   * @param taskOutput the specific task output file
-   * @param taskOutputPath the job's work directory
-   * @return the final path for the specific output file
-   * @throws IOException
-   */
-  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
-                            Path taskOutputPath) throws IOException {    
-    URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(), 
-        outputFileSystem.getWorkingDirectory()).toUri();
-    URI taskOutputPathUri = 
-        taskOutputPath.makeQualified(
-            outputFileSystem.getUri(),
-            outputFileSystem.getWorkingDirectory()).toUri();
-    URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
-    if (taskOutputUri == relativePath) {
-      throw new IOException("Can not get the relative path: base = " + 
-          taskOutputPathUri + " child = " + taskOutputUri);
-    }
-    if (relativePath.getPath().length() > 0) {
-      return new Path(jobOutputDir, relativePath.getPath());
+  @Private
+  public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
+    if (hasOutputPath()) { 
+      context.progress();
+      if(taskAttemptPath == null) {
+        taskAttemptPath = getTaskAttemptPath(context);
+      }
+      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+      if(!fs.delete(taskAttemptPath, true)) {
+        LOG.warn("Could not delete "+taskAttemptPath);
+      }
     } else {
-      return jobOutputDir;
+      LOG.warn("Output Path is null in abortTask()");
     }
   }
 
@@ -331,16 +471,20 @@ public class FileOutputCommitter extends
   @Override
   public boolean needsTaskCommit(TaskAttemptContext context
                                  ) throws IOException {
-    return workPath != null && outputFileSystem.exists(workPath);
+    return needsTaskCommit(context, null);
   }
 
-  /**
-   * Get the directory that the task should write results into
-   * @return the work directory
-   * @throws IOException
-   */
-  public Path getWorkPath() throws IOException {
-    return workPath;
+  @Private
+  public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
+    ) throws IOException {
+    if(hasOutputPath()) {
+      if(taskAttemptPath == null) {
+        taskAttemptPath = getTaskAttemptPath(context);
+      }
+      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+      return fs.exists(taskAttemptPath);
+    }
+    return false;
   }
 
   @Override
@@ -352,43 +496,35 @@ public class FileOutputCommitter extends
   public void recoverTask(TaskAttemptContext context)
       throws IOException {
     context.progress();
-    Path jobOutputPath = 
-        new Path(outputPath, getJobAttemptBaseDirName(context));
-    int previousAttempt =         
-        context.getConfiguration().getInt(
-            MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
+    TaskAttemptID attemptId = context.getTaskAttemptID();
+    int previousAttempt = getAppAttemptId(context) - 1;
     if (previousAttempt < 0) {
       throw new IOException ("Cannot recover task output for first attempt...");
     }
-
-    Path pathToRecover = 
-        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
-    LOG.debug("Trying to recover task from " + pathToRecover
-        + " into " + jobOutputPath);
-    if (outputFileSystem.exists(pathToRecover)) {
-      // Move the task outputs to their final place
-      moveJobOutputs(outputFileSystem, 
-          pathToRecover, jobOutputPath, pathToRecover);
-      LOG.info("Saved output of job to " + jobOutputPath);
+    
+    Path committedTaskPath = getCommittedTaskPath(context);
+    Path previousCommittedTaskPath = getCommittedTaskPath(
+        previousAttempt, context);
+    FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+    
+    LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
+        + " into " + committedTaskPath);
+    if (fs.exists(previousCommittedTaskPath)) {
+      if(fs.exists(committedTaskPath)) {
+        if(!fs.delete(committedTaskPath, true)) {
+          throw new IOException("Could not delete "+committedTaskPath);
+        }
+      }
+      //Rename can fail if the parent directory does not yet exist.
+      Path committedParent = committedTaskPath.getParent();
+      fs.mkdirs(committedParent);
+      if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+        throw new IOException("Could not rename " + previousCommittedTaskPath +
+            " to " + committedTaskPath);
+      }
+      LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
+    } else {
+      LOG.warn(attemptId+" had no output to recover.");
     }
   }
-
-  protected static String getJobAttemptBaseDirName(JobContext context) {
-    int appAttemptId = 
-        context.getConfiguration().getInt(
-            MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
-    return getJobAttemptBaseDirName(appAttemptId);
-  }
-
-  protected static String getJobAttemptBaseDirName(int appAttemptId) {
-    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
-      + appAttemptId;
-  }
-
-  protected static String getTaskAttemptBaseDirName(
-      TaskAttemptContext context) {
-	  return getJobAttemptBaseDirName(context) + Path.SEPARATOR + 
-	  FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-      "_" + context.getTaskAttemptID().toString();
-  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb  4 00:04:53 2012
@@ -105,10 +105,9 @@ public class TestFileOutputCommitter ext
 
     // do commit
     committer.commitTask(tContext);
-    Path jobTempDir1 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir1.toString()).exists()));
+    Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+    File jtd1 = new File(jobTempDir1.toUri().getPath());
+    assertTrue(jtd1.exists());
     validateContent(jobTempDir1);        
     
     //now while running the second app attempt, 
@@ -119,14 +118,12 @@ public class TestFileOutputCommitter ext
     JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
     TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
     FileOutputCommitter committer2 = new FileOutputCommitter();
-    committer.setupJob(jContext2);
-    Path jobTempDir2 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir2.toString()).exists()));
+    committer2.setupJob(jContext2);
+    Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
     
-    tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
     committer2.recoverTask(tContext2);
+    File jtd2 = new File(jobTempDir2.toUri().getPath());
+    assertTrue(jtd2.exists());
     validateContent(jobTempDir2);
     
     committer2.commitJob(jContext2);
@@ -135,7 +132,8 @@ public class TestFileOutputCommitter ext
   }
 
   private void validateContent(Path dir) throws IOException {
-    File expectedFile = new File(new Path(dir, partFile).toString());
+    File fdir = new File(dir.toUri().getPath());
+    File expectedFile = new File(fdir, partFile);
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
@@ -244,21 +242,17 @@ public class TestFileOutputCommitter ext
 
     // do abort
     committer.abortTask(tContext);
-    FileSystem outputFileSystem = outDir.getFileSystem(conf);
-    Path workPath = new Path(outDir,
-        committer.getTaskAttemptBaseDirName(tContext))
-        .makeQualified(outputFileSystem);
-    File expectedFile = new File(new Path(workPath, partFile)
-        .toString());
+    File out = new File(outDir.toUri().getPath());
+    Path workPath = committer.getWorkPath(tContext, outDir);
+    File wp = new File(workPath.toUri().getPath());
+    File expectedFile = new File(wp, partFile);
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
-    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
-        .toString());
+    expectedFile = new File(out, FileOutputCommitter.TEMP_DIR_NAME);
     assertFalse("job temp dir still exists", expectedFile.exists());
-    assertEquals("Output directory not empty", 0, new File(outDir.toString())
-        .listFiles().length);
-    FileUtil.fullyDelete(new File(outDir.toString()));
+    assertEquals("Output directory not empty", 0, out.listFiles().length);
+    FileUtil.fullyDelete(out);
   }
 
   public static class FakeFileSystem extends RawLocalFileSystem {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Sat Feb  4 00:04:53 2012
@@ -60,6 +60,22 @@ public class TestFileOutputCommitter ext
   private Text val2 = new Text("val2");
 
   
+  private static void cleanup() throws IOException {
+    Configuration conf = new Configuration();
+    FileSystem fs = outDir.getFileSystem(conf);
+    fs.delete(outDir, true);
+  }
+  
+  @Override
+  public void setUp() throws IOException {
+    cleanup();
+  }
+  
+  @Override
+  public void tearDown() throws IOException {
+    cleanup();
+  }
+  
   private void writeOutput(RecordWriter theRecordWriter,
       TaskAttemptContext context) throws IOException, InterruptedException {
     NullWritable nullWritable = NullWritable.get();
@@ -114,11 +130,10 @@ public class TestFileOutputCommitter ext
 
     // do commit
     committer.commitTask(tContext);
-    Path jobTempDir1 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir1.toString()).exists()));
-    validateContent(jobTempDir1);    
+    Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+    File jtd = new File(jobTempDir1.toUri().getPath());
+    assertTrue(jtd.exists());
+    validateContent(jtd);    
     
     //now while running the second app attempt, 
     //recover the task output from first attempt
@@ -128,15 +143,13 @@ public class TestFileOutputCommitter ext
     JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
     TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
     FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
-    committer.setupJob(tContext2);
-    Path jobTempDir2 = new Path(outDir, 
-        FileOutputCommitter.getJobAttemptBaseDirName(
-            conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
-    assertTrue((new File(jobTempDir2.toString()).exists()));
+    committer2.setupJob(tContext2);
+    Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
+    File jtd2 = new File(jobTempDir2.toUri().getPath());
     
-    tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
     committer2.recoverTask(tContext2);
-    validateContent(jobTempDir2);
+    assertTrue(jtd2.exists());
+    validateContent(jtd2);
     
     committer2.commitJob(jContext2);
     validateContent(outDir);
@@ -144,7 +157,12 @@ public class TestFileOutputCommitter ext
   }
 
   private void validateContent(Path dir) throws IOException {
-    File expectedFile = new File(new Path(dir, partFile).toString());
+    validateContent(new File(dir.toUri().getPath()));
+  }
+  
+  private void validateContent(File dir) throws IOException {
+    File expectedFile = new File(dir, partFile);
+    assertTrue("Could not find "+expectedFile, expectedFile.exists());
     StringBuffer expectedOutput = new StringBuffer();
     expectedOutput.append(key1).append('\t').append(val1).append("\n");
     expectedOutput.append(val1).append("\n");
@@ -259,7 +277,7 @@ public class TestFileOutputCommitter ext
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
-    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
         .toString());
     assertFalse("job temp dir still exists", expectedFile.exists());
     assertEquals("Output directory not empty", 0, new File(outDir.toString())
@@ -315,12 +333,10 @@ public class TestFileOutputCommitter ext
     assertNotNull(th);
     assertTrue(th instanceof IOException);
     assertTrue(th.getMessage().contains("fake delete failed"));
-    File jobTmpDir = new File(new Path(outDir,
-        FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-        conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) +
-        Path.SEPARATOR +
-        FileOutputCommitter.TEMP_DIR_NAME).toString());
-    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+    Path jtd = committer.getJobAttemptPath(jContext);
+    File jobTmpDir = new File(jtd.toUri().getPath());
+    Path ttd = committer.getTaskAttemptPath(tContext);
+    File taskTmpDir = new File(ttd.toUri().getPath());
     File expectedFile = new File(taskTmpDir, partFile);
     assertTrue(expectedFile + " does not exists", expectedFile.exists());
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb  4 00:04:53 2012
@@ -74,7 +74,7 @@ public class TestFileOutputCommitter ext
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, 
-      committer.getTempTaskOutputPath(tContext));
+      committer.getTaskAttemptPath(tContext));
 
     committer.setupJob(jContext);
     committer.setupTask(tContext);
@@ -115,7 +115,7 @@ public class TestFileOutputCommitter ext
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, committer
-        .getTempTaskOutputPath(tContext));
+        .getTaskAttemptPath(tContext));
 
     // do setup
     committer.setupJob(jContext);
@@ -134,13 +134,13 @@ public class TestFileOutputCommitter ext
     // do abort
     committer.abortTask(tContext);
     File expectedFile = new File(new Path(committer
-        .getTempTaskOutputPath(tContext), file).toString());
+        .getTaskAttemptPath(tContext), file).toString());
     assertFalse("task temp dir still exists", expectedFile.exists());
 
     committer.abortJob(jContext, JobStatus.State.FAILED);
     expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
         .toString());
-    assertFalse("job temp dir still exists", expectedFile.exists());
+    assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
     assertEquals("Output directory not empty", 0, new File(outDir.toString())
         .listFiles().length);
     FileUtil.fullyDelete(new File(outDir.toString()));
@@ -170,16 +170,15 @@ public class TestFileOutputCommitter ext
     TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
     FileOutputFormat.setWorkOutputPath(job, committer
-        .getTempTaskOutputPath(tContext));
+        .getTaskAttemptPath(tContext));
 
     // do setup
     committer.setupJob(jContext);
     committer.setupTask(tContext);
     
     String file = "test.txt";
-    String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
-    File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
-    File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
+    File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath());
+    File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath());
     File expectedFile = new File(taskTmpDir, file);
 
     // A reporter that does nothing

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1240413&r1=1240412&r2=1240413&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java Sat Feb  4 00:04:53 2012
@@ -34,7 +34,7 @@ public class TestTaskCommit extends Hado
 
   static class CommitterWithCommitFail extends FileOutputCommitter {
     public void commitTask(TaskAttemptContext context) throws IOException {
-      Path taskOutputPath = getTempTaskOutputPath(context);
+      Path taskOutputPath = getTaskAttemptPath(context);
       TaskAttemptID attemptId = context.getTaskAttemptID();
       JobConf job = context.getJobConf();
       if (taskOutputPath != null) {