You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ge...@apache.org on 2015/03/10 19:39:29 UTC

hadoop git commit: MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output files. (Siqi Li via gera)

Repository: hadoop
Updated Branches:
  refs/heads/trunk a380643d2 -> aa92b764a


MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output files. (Siqi Li via gera)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/aa92b764
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/aa92b764
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/aa92b764

Branch: refs/heads/trunk
Commit: aa92b764a7ddb888d097121c4d610089a0053d11
Parents: a380643
Author: Gera Shegalov <ge...@apache.org>
Authored: Tue Mar 10 11:12:48 2015 -0700
Committer: Gera Shegalov <ge...@apache.org>
Committed: Tue Mar 10 11:32:08 2015 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../lib/output/FileOutputCommitter.java         | 119 ++++++++++------
 .../src/main/resources/mapred-default.xml       |  54 ++++++++
 .../hadoop/mapred/TestFileOutputCommitter.java  | 134 +++++++++++++++----
 .../lib/output/TestFileOutputCommitter.java     | 116 ++++++++++++++--
 5 files changed, 349 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa92b764/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index eecf022..0bbe85c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -348,6 +348,9 @@ Release 2.7.0 - UNRELEASED
 
     MAPREDUCE-6059. Speed up history server startup time (Siqi Li via aw)
 
+    MAPREDUCE-4815. Speed up FileOutputCommitter#commitJob for many output
+    files. (Siqi Li via gera)
+
   BUG FIXES
 
     MAPREDUCE-6210. Use getApplicationAttemptId() instead of getApplicationId()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa92b764/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 55252f0..28a8548 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.lib.output;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
@@ -25,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -57,10 +59,14 @@ public class FileOutputCommitter extends OutputCommitter {
   @Deprecated
   protected static final String TEMP_DIR_NAME = PENDING_DIR_NAME;
   public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
-  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = 
-    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+  public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+      "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+  public static final String FILEOUTPUTCOMMITTER_ALGORITHM_VERSION =
+      "mapreduce.fileoutputcommitter.algorithm.version";
+  public static final int FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT = 1;
   private Path outputPath = null;
   private Path workPath = null;
+  private final int algorithmVersion;
 
   /**
    * Create a file output committer
@@ -87,6 +93,14 @@ public class FileOutputCommitter extends OutputCommitter {
   @Private
   public FileOutputCommitter(Path outputPath, 
                              JobContext context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    algorithmVersion =
+        conf.getInt(FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+                    FILEOUTPUTCOMMITTER_ALGORITHM_VERSION_DEFAULT);
+    LOG.info("File Output Committer Algorithm version is " + algorithmVersion);
+    if (algorithmVersion != 1 && algorithmVersion != 2) {
+      throw new IOException("Only 1 or 2 algorithm version is supported");
+    }
     if (outputPath != null) {
       FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
       this.outputPath = fs.makeQualified(outputPath);
@@ -248,14 +262,14 @@ public class FileOutputCommitter extends OutputCommitter {
     return new Path(getJobAttemptPath(appAttemptId, out),
         String.valueOf(context.getTaskAttemptID().getTaskID()));
   }
-  
+
   private static class CommittedTaskFilter implements PathFilter {
     @Override
     public boolean accept(Path path) {
       return !PENDING_DIR_NAME.equals(path.getName());
     }
   }
-  
+
   /**
    * Get a list of all paths where output from committed tasks are stored.
    * @param context the context of the current job
@@ -268,7 +282,7 @@ public class FileOutputCommitter extends OutputCommitter {
     FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
     return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
   }
-  
+
   /**
    * Get the directory that the task should write results into.
    * @return the work directory
@@ -295,7 +309,7 @@ public class FileOutputCommitter extends OutputCommitter {
       LOG.warn("Output Path is null in setupJob()");
     }
   }
-  
+
   /**
    * The job has completed so move all committed tasks to the final output dir.
    * Delete the temporary directory, including all of the work directories.
@@ -306,8 +320,11 @@ public class FileOutputCommitter extends OutputCommitter {
     if (hasOutputPath()) {
       Path finalOutput = getOutputPath();
       FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
-      for(FileStatus stat: getAllCommittedTaskPaths(context)) {
-        mergePaths(fs, stat, finalOutput);
+
+      if (algorithmVersion == 1) {
+        for (FileStatus stat: getAllCommittedTaskPaths(context)) {
+          mergePaths(fs, stat, finalOutput);
+        }
       }
 
       // delete the _temporary folder and create a _done file in the o/p folder
@@ -417,27 +434,42 @@ public class FileOutputCommitter extends OutputCommitter {
 
   @Private
   public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
-  throws IOException {
+      throws IOException {
+
     TaskAttemptID attemptId = context.getTaskAttemptID();
     if (hasOutputPath()) {
       context.progress();
       if(taskAttemptPath == null) {
         taskAttemptPath = getTaskAttemptPath(context);
       }
-      Path committedTaskPath = getCommittedTaskPath(context);
       FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
-      if (fs.exists(taskAttemptPath)) {
-        if(fs.exists(committedTaskPath)) {
-          if(!fs.delete(committedTaskPath, true)) {
-            throw new IOException("Could not delete " + committedTaskPath);
+      FileStatus taskAttemptDirStatus;
+      try {
+        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
+      } catch (FileNotFoundException e) {
+        taskAttemptDirStatus = null;
+      }
+
+      if (taskAttemptDirStatus != null) {
+        if (algorithmVersion == 1) {
+          Path committedTaskPath = getCommittedTaskPath(context);
+          if (fs.exists(committedTaskPath)) {
+             if (!fs.delete(committedTaskPath, true)) {
+               throw new IOException("Could not delete " + committedTaskPath);
+             }
           }
+          if (!fs.rename(taskAttemptPath, committedTaskPath)) {
+            throw new IOException("Could not rename " + taskAttemptPath + " to "
+                + committedTaskPath);
+          }
+          LOG.info("Saved output of task '" + attemptId + "' to " +
+              committedTaskPath);
+        } else {
+          // directly merge everything from taskAttemptPath to output directory
+          mergePaths(fs, taskAttemptDirStatus, outputPath);
+          LOG.info("Saved output of task '" + attemptId + "' to " +
+              outputPath);
         }
-        if(!fs.rename(taskAttemptPath, committedTaskPath)) {
-          throw new IOException("Could not rename " + taskAttemptPath + " to "
-              + committedTaskPath);
-        }
-        LOG.info("Saved output of task '" + attemptId + "' to " + 
-            committedTaskPath);
       } else {
         LOG.warn("No Output found for " + attemptId);
       }
@@ -511,32 +543,43 @@ public class FileOutputCommitter extends OutputCommitter {
         throw new IOException ("Cannot recover task output for first attempt...");
       }
 
-      Path committedTaskPath = getCommittedTaskPath(context);
       Path previousCommittedTaskPath = getCommittedTaskPath(
           previousAttempt, context);
-      FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+      FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
 
-      LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
-          + " into " + committedTaskPath);
-      if (fs.exists(previousCommittedTaskPath)) {
-        if(fs.exists(committedTaskPath)) {
-          if(!fs.delete(committedTaskPath, true)) {
-            throw new IOException("Could not delete "+committedTaskPath);
+      LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
+      if (algorithmVersion == 1) {
+        if (fs.exists(previousCommittedTaskPath)) {
+          Path committedTaskPath = getCommittedTaskPath(context);
+          if (fs.exists(committedTaskPath)) {
+            if (!fs.delete(committedTaskPath, true)) {
+              throw new IOException("Could not delete "+committedTaskPath);
+            }
           }
+          //Rename can fail if the parent directory does not yet exist.
+          Path committedParent = committedTaskPath.getParent();
+          fs.mkdirs(committedParent);
+          if (!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+            throw new IOException("Could not rename " + previousCommittedTaskPath +
+                " to " + committedTaskPath);
+          }
+        } else {
+            LOG.warn(attemptId+" had no output to recover.");
         }
-        //Rename can fail if the parent directory does not yet exist.
-        Path committedParent = committedTaskPath.getParent();
-        fs.mkdirs(committedParent);
-        if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
-          throw new IOException("Could not rename " + previousCommittedTaskPath +
-              " to " + committedTaskPath);
-        }
-        LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
       } else {
-        LOG.warn(attemptId+" had no output to recover.");
+        // essentially a no-op, but for backwards compatibility
+        // after upgrade to the new fileOutputCommitter,
+        // check if there are any output left in committedTaskPath
+        if (fs.exists(previousCommittedTaskPath)) {
+          LOG.info("Recovering task for upgrading scenario, moving files from "
+              + previousCommittedTaskPath + " to " + outputPath);
+          FileStatus from = fs.getFileStatus(previousCommittedTaskPath);
+          mergePaths(fs, from, outputPath);
+        }
+        LOG.info("Done recovering task " + attemptId);
       }
     } else {
       LOG.warn("Output Path is null in recoverTask()");
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa92b764/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 1162183..d7bec9c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1318,6 +1318,60 @@
 </property>
 
 <property>
+  <name>mapreduce.fileoutputcommitter.algorithm.version</name>
+  <value>1</value>
+  <description>The file output committer algorithm version
+  valid algorithm version number: 1 or 2
+  default to 1, which is the original algorithm
+
+  In algorithm version 1,
+
+  1. commitTask will rename directory
+  $joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
+  to
+  $joboutput/_temporary/$appAttemptID/$taskID/
+
+  2. recoverTask will also do a rename
+  $joboutput/_temporary/$appAttemptID/$taskID/
+  to
+  $joboutput/_temporary/($appAttemptID + 1)/$taskID/
+
+  3. commitJob will merge every task output file in
+  $joboutput/_temporary/$appAttemptID/$taskID/
+  to
+  $joboutput/, then it will delete $joboutput/_temporary/
+  and write $joboutput/_SUCCESS
+
+  It has a performance regression, which is discussed in MAPREDUCE-4815.
+  If a job generates many files to commit then the commitJob
+  method call at the end of the job can take minutes.
+  the commit is single-threaded and waits until all
+  tasks have completed before commencing.
+
+  algorithm version 2 will change the behavior of commitTask,
+  recoverTask, and commitJob.
+
+  1. commitTask will rename all files in
+  $joboutput/_temporary/$appAttemptID/_temporary/$taskAttemptID/
+  to $joboutput/
+
+  2. recoverTask actually doesn't require to do anything, but for
+  upgrade from version 1 to version 2 case, it will check if there
+  are any files in
+  $joboutput/_temporary/($appAttemptID - 1)/$taskID/
+  and rename them to $joboutput/
+
+  3. commitJob can simply delete $joboutput/_temporary and write
+  $joboutput/_SUCCESS
+
+  This algorithm will reduce the output commit time for
+  large jobs by having the tasks commit directly to the final
+  output directory as they were completing and commitJob had
+  very little to do.
+  </description>
+</property>
+
+<property>
   <name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
   <value>1000</value>
   <description>The interval in ms at which the MR AppMaster should send

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa92b764/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
index 0859571..3207a71 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
@@ -50,7 +50,6 @@ public class TestFileOutputCommitter extends TestCase {
   private Text val1 = new Text("val1");
   private Text val2 = new Text("val2");
 
-  
   private void writeOutput(RecordWriter theRecordWriter,
       TaskAttemptContext context) throws IOException, InterruptedException {
     NullWritable nullWritable = NullWritable.get();
@@ -83,12 +82,16 @@ public class TestFileOutputCommitter extends TestCase {
       theRecordWriter.close(null);
     }
   }
-  
-  public void testRecovery() throws Exception {
-    JobConf conf = new JobConf();
+
+  private void testRecoveryInternal(int commitVersion, int recoveryVersion)
+      throws Exception {
+  JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
     conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        commitVersion);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
@@ -99,7 +102,7 @@ public class TestFileOutputCommitter extends TestCase {
 
     // write output
     TextOutputFormat theOutputFormat = new TextOutputFormat();
-    RecordWriter theRecordWriter = 
+    RecordWriter theRecordWriter =
         theOutputFormat.getRecordWriter(null, conf, partFile, null);
     writeOutput(theRecordWriter, tContext);
 
@@ -107,31 +110,59 @@ public class TestFileOutputCommitter extends TestCase {
     if(committer.needsTaskCommit(tContext)) {
       committer.commitTask(tContext);
     }
+
     Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
     File jtd1 = new File(jobTempDir1.toUri().getPath());
-    assertTrue(jtd1.exists());
-    validateContent(jobTempDir1);        
-    
-    //now while running the second app attempt, 
+    if (commitVersion == 1) {
+      assertTrue("Version 1 commits to temporary dir " + jtd1, jtd1.exists());
+      validateContent(jobTempDir1);
+    } else {
+      assertFalse("Version 2 commits to output dir " + jtd1, jtd1.exists());
+    }
+
+    //now while running the second app attempt,
     //recover the task output from first attempt
     JobConf conf2 = new JobConf(conf);
     conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
     conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
+    conf2.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        recoveryVersion);
     JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
     TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
     FileOutputCommitter committer2 = new FileOutputCommitter();
     committer2.setupJob(jContext2);
-    Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
-    
+
     committer2.recoverTask(tContext2);
+
+    Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
     File jtd2 = new File(jobTempDir2.toUri().getPath());
-    assertTrue(jtd2.exists());
-    validateContent(jobTempDir2);
-    
+    if (recoveryVersion == 1) {
+      assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
+      validateContent(jobTempDir2);
+    } else {
+        assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
+        if (commitVersion == 1) {
+          assertTrue("Version 2  recovery moves to output dir from "
+              + jtd1 , jtd1.list().length == 0);
+        }
+      }
+
     committer2.commitJob(jContext2);
     validateContent(outDir);
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
+  public void testRecoveryV1() throws Exception {
+    testRecoveryInternal(1, 1);
+  }
+
+  public void testRecoveryV2() throws Exception {
+    testRecoveryInternal(2, 2);
+  }
+
+  public void testRecoveryUpgradeV1V2() throws Exception {
+    testRecoveryInternal(1, 2);
+  }
 
   private void validateContent(Path dir) throws IOException {
     File fdir = new File(dir.toUri().getPath());
@@ -170,11 +201,13 @@ public class TestFileOutputCommitter extends TestCase {
     assert(fileCount > 0);
     assert(dataFileFound && indexFileFound);
   }
-  
-  public void testCommitter() throws Exception {
+
+  private void testCommitterInternal(int version) throws Exception {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
@@ -200,21 +233,33 @@ public class TestFileOutputCommitter extends TestCase {
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
-  public void testMapFileOutputCommitter() throws Exception {
+  public void testCommitterV1() throws Exception {
+    testCommitterInternal(1);
+  }
+
+  public void testCommitterV2() throws Exception {
+    testCommitterInternal(2);
+  }
+
+  private void testMapFileOutputCommitterInternal(int version)
+      throws Exception {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
-    FileOutputCommitter committer = new FileOutputCommitter();    
-    
+    FileOutputCommitter committer = new FileOutputCommitter();
+
     // setup
     committer.setupJob(jContext);
     committer.setupTask(tContext);
 
     // write output
     MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
-    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    RecordWriter theRecordWriter =
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
     writeMapFileOutput(theRecordWriter, tContext);
 
     // do commit
@@ -227,11 +272,29 @@ public class TestFileOutputCommitter extends TestCase {
     validateMapFileOutputContent(FileSystem.get(conf), outDir);
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
-  
-  public void testMapOnlyNoOutput() throws Exception {
+
+  public void testMapFileOutputCommitterV1() throws Exception {
+    testMapFileOutputCommitterInternal(1);
+  }
+
+  public void testMapFileOutputCommitterV2() throws Exception {
+    testMapFileOutputCommitterInternal(2);
+  }
+
+  public void testMapOnlyNoOutputV1() throws Exception {
+    testMapOnlyNoOutputInternal(1);
+  }
+
+  public void testMapOnlyNoOutputV2() throws Exception {
+    testMapOnlyNoOutputInternal(2);
+  }
+
+  private void testMapOnlyNoOutputInternal(int version) throws Exception {
     JobConf conf = new JobConf();
     //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
     conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();    
@@ -249,11 +312,14 @@ public class TestFileOutputCommitter extends TestCase {
     // validate output
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
-  
-  public void testAbort() throws IOException, InterruptedException {
+
+  private void testAbortInternal(int version)
+      throws IOException, InterruptedException {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);
     conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter();
@@ -283,6 +349,14 @@ public class TestFileOutputCommitter extends TestCase {
     FileUtil.fullyDelete(out);
   }
 
+  public void testAbortV1() throws Exception {
+    testAbortInternal(1);
+  }
+
+  public void testAbortV2() throws Exception {
+    testAbortInternal(2);
+  }
+
   public static class FakeFileSystem extends RawLocalFileSystem {
     public FakeFileSystem() {
       super();
@@ -299,11 +373,14 @@ public class TestFileOutputCommitter extends TestCase {
   }
 
   
-  public void testFailAbort() throws IOException, InterruptedException {
+  private void testFailAbortInternal(int version)
+      throws IOException, InterruptedException {
     JobConf conf = new JobConf();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
     conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
     conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(org.apache.hadoop.mapreduce.lib.output.
+        FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version);
     conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
     FileOutputFormat.setOutputPath(conf, outDir);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
@@ -353,6 +430,13 @@ public class TestFileOutputCommitter extends TestCase {
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
+  public void testFailAbortV1() throws Exception {
+    testFailAbortInternal(1);
+  }
+
+  public void testFailAbortV2() throws Exception {
+    testFailAbortInternal(2);
+  }
   public static String slurp(File f) throws IOException {
     int len = (int) f.length();
     byte[] buf = new byte[len];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/aa92b764/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index d20480e..8f60300 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -109,12 +109,15 @@ public class TestFileOutputCommitter extends TestCase {
     }
   }
   
-  public void testRecovery() throws Exception {
+  private void testRecoveryInternal(int commitVersion, int recoveryVersion)
+      throws Exception {
     Job job = Job.getInstance();
     FileOutputFormat.setOutputPath(job, outDir);
     Configuration conf = job.getConfiguration();
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        commitVersion);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -130,32 +133,59 @@ public class TestFileOutputCommitter extends TestCase {
 
     // do commit
     committer.commitTask(tContext);
+
     Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
     File jtd = new File(jobTempDir1.toUri().getPath());
-    assertTrue(jtd.exists());
-    validateContent(jtd);    
-    
+    if (commitVersion == 1) {
+      assertTrue("Version 1 commits to temporary dir " + jtd, jtd.exists());
+      validateContent(jtd);
+    } else {
+      assertFalse("Version 2 commits to output dir " + jtd, jtd.exists());
+    }
+
     //now while running the second app attempt, 
     //recover the task output from first attempt
     Configuration conf2 = job.getConfiguration();
     conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
     conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
+    conf2.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        recoveryVersion);
     JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
     TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
     FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
     committer2.setupJob(tContext2);
     Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
     File jtd2 = new File(jobTempDir2.toUri().getPath());
-    
+
     committer2.recoverTask(tContext2);
-    assertTrue(jtd2.exists());
-    validateContent(jtd2);
-    
+    if (recoveryVersion == 1) {
+      assertTrue("Version 1 recovers to " + jtd2, jtd2.exists());
+      validateContent(jtd2);
+    } else {
+      assertFalse("Version 2 commits to output dir " + jtd2, jtd2.exists());
+      if (commitVersion == 1) {
+        assertTrue("Version 2  recovery moves to output dir from "
+            + jtd , jtd.list().length == 0);
+      }
+    }
+
     committer2.commitJob(jContext2);
     validateContent(outDir);
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
+  public void testRecoveryV1() throws Exception {
+    testRecoveryInternal(1, 1);
+  }
+
+  public void testRecoveryV2() throws Exception {
+    testRecoveryInternal(2, 2);
+  }
+
+  public void testRecoveryUpgradeV1V2() throws Exception {
+    testRecoveryInternal(1, 2);
+  }
+
   private void validateContent(Path dir) throws IOException {
     validateContent(new File(dir.toUri().getPath()));
   }
@@ -197,12 +227,14 @@ public class TestFileOutputCommitter extends TestCase {
     assert(fileCount > 0);
     assert(dataFileFound && indexFileFound);
   }
-  
-  public void testCommitter() throws Exception {
-    Job job = Job.getInstance();
+
+  private void testCommitterInternal(int version) throws Exception {
+  Job job = Job.getInstance();
     FileOutputFormat.setOutputPath(job, outDir);
     Configuration conf = job.getConfiguration();
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -225,11 +257,22 @@ public class TestFileOutputCommitter extends TestCase {
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
-  public void testMapFileOutputCommitter() throws Exception {
+  public void testCommitterV1() throws Exception {
+    testCommitterInternal(1);
+  }
+
+  public void testCommitterV2() throws Exception {
+    testCommitterInternal(2);
+  }
+
+  private void testMapFileOutputCommitterInternal(int version)
+      throws Exception {
     Job job = Job.getInstance();
     FileOutputFormat.setOutputPath(job, outDir);
     Configuration conf = job.getConfiguration();
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());    
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -251,12 +294,38 @@ public class TestFileOutputCommitter extends TestCase {
     validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
+
+  public void testMapFileOutputCommitterV1() throws Exception {
+    testMapFileOutputCommitterInternal(1);
+  }
   
-  public void testAbort() throws IOException, InterruptedException {
+  public void testMapFileOutputCommitterV2() throws Exception {
+    testMapFileOutputCommitterInternal(2);
+  }
+
+  public void testInvalidVersionNumber() throws IOException {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 3);
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    try {
+      new FileOutputCommitter(outDir, tContext);
+      fail("should've thrown an exception!");
+    } catch (IOException e) {
+      //test passed
+    }
+  }
+
+  private void testAbortInternal(int version)
+      throws IOException, InterruptedException {
     Job job = Job.getInstance();
     FileOutputFormat.setOutputPath(job, outDir);
     Configuration conf = job.getConfiguration();
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
     FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@@ -285,6 +354,14 @@ public class TestFileOutputCommitter extends TestCase {
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
+  public void testAbortV1() throws IOException, InterruptedException {
+    testAbortInternal(1);
+  }
+
+  public void testAbortV2() throws IOException, InterruptedException {
+    testAbortInternal(2);
+  }
+
   public static class FakeFileSystem extends RawLocalFileSystem {
     public FakeFileSystem() {
       super();
@@ -301,13 +378,16 @@ public class TestFileOutputCommitter extends TestCase {
   }
 
   
-  public void testFailAbort() throws IOException, InterruptedException {
+  private void testFailAbortInternal(int version)
+      throws IOException, InterruptedException {
     Job job = Job.getInstance();
     Configuration conf = job.getConfiguration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
     conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
     conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
     FileOutputFormat.setOutputPath(job, outDir);
     JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
     TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
@@ -353,6 +433,14 @@ public class TestFileOutputCommitter extends TestCase {
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
+  public void testFailAbortV1() throws Exception {
+    testFailAbortInternal(1);
+  }
+
+  public void testFailAbortV2() throws Exception {
+    testFailAbortInternal(2);
+  }
+
   public static String slurp(File f) throws IOException {
     int len = (int) f.length();
     byte[] buf = new byte[len];