You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/10/05 14:15:45 UTC

svn commit: r1179188 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/ha...

Author: vinodkv
Date: Wed Oct  5 12:15:44 2011
New Revision: 1179188

URL: http://svn.apache.org/viewvc?rev=1179188&view=rev
Log:
MAPREDUCE-2702. Added a new API in OutputCommitter for recovering the outputs of tasks from a crashed job so as to support MR Application Master recovery. Contributed by Sharad Agarwal and Arun C Murthy.

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1179188&r1=1179187&r2=1179188&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Oct  5 12:15:44 2011
@@ -347,6 +347,10 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2907. Changed log level for various messages in ResourceManager
     from INFO to DEBUG. (Ravi Prakash via vinodkv)
 
+    MAPREDUCE-2702. Added a new API in OutputCommitter for recovering
+    the outputs of tasks from a crashed job so as to support MR Application
+    Master recovery. (Sharad Agarwal and Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1179188&r1=1179187&r2=1179188&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Oct  5 12:15:44 2011
@@ -473,4 +473,6 @@ public interface MRJobConfig {
   public static final String MAPREDUCE_V2_CHILD_CLASS = 
       "org.apache.hadoop.mapred.YarnChild";
 
+  public static final String APPLICATION_ATTEMPT_ID =
+      "mapreduce.job.application.attempt.id";
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1179188&r1=1179187&r2=1179188&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Wed Oct  5 12:15:44 2011
@@ -143,4 +143,35 @@ public abstract class OutputCommitter {
    */
   public abstract void abortTask(TaskAttemptContext taskContext)
   throws IOException;
+
+  /**
+   * Is task output recovery supported for restarting jobs?
+   * 
+   * If task output recovery is supported, job restart can be done more 
+   * efficiently.
+   * 
+   * @return <code>true</code> if task output recovery is supported,
+   *         <code>false</code> otherwise
+   * @see #recoverTask(TaskAttemptContext)         
+   */
+  public boolean isRecoverySupported() {
+    return false;
+  }
+  
+  /**
+   * Recover the task output. 
+   * 
+   * The retry-count for the job will be passed via the 
+   * {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in  
+   * {@link TaskAttemptContext#getConfiguration()} for the 
+   * <code>OutputCommitter</code>.
+   * 
+   * If an exception is thrown the task will be attempted again. 
+   * 
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException
+   */
+  public void recoverTask(TaskAttemptContext taskContext)
+  throws IOException
+  {}
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1179188&r1=1179187&r2=1179188&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Wed Oct  5 12:15:44 2011
@@ -35,7 +35,6 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.util.StringUtils;
 
 /** An {@link OutputCommitter} that commits files specified 
  * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. 
@@ -69,9 +68,8 @@ public class FileOutputCommitter extends
       this.outputPath = outputPath;
       outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
       workPath = new Path(outputPath,
-                          (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-                           "_" + context.getTaskAttemptID().toString()
-                           )).makeQualified(outputFileSystem);
+                          getTaskAttemptBaseDirName(context))
+                          .makeQualified(outputFileSystem);
     }
   }
 
@@ -82,7 +80,8 @@ public class FileOutputCommitter extends
    */
   public void setupJob(JobContext context) throws IOException {
     if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + 
+    		  Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
       if (!fileSys.mkdirs(tmpDir)) {
         LOG.error("Mkdirs failed to create " + tmpDir.toString());
@@ -106,11 +105,27 @@ public class FileOutputCommitter extends
   }
   
   /**
+   * Move all job output to the final place.
    * Delete the temporary directory, including all of the work directories.
    * Create a _SUCCESS file to make it as successful.
    * @param context the job's context
    */
   public void commitJob(JobContext context) throws IOException {
+    //delete the task temp directory from the current jobtempdir
+    Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+        Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+    FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+    if (fileSys.exists(tmpDir)) {
+      fileSys.delete(tmpDir, true);
+    } else {
+      LOG.warn("Task temp dir could not be deleted " + tmpDir);
+    }
+    
+	  //move the job output to final place
+    Path jobOutputPath = 
+        new Path(outputPath, getJobAttemptBaseDirName(context));
+	  moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
+	  
     // delete the _temporary folder and create a _done file in the o/p folder
     cleanupJob(context);
     if (shouldMarkOutputDir(context.getConfiguration())) {
@@ -118,6 +133,31 @@ public class FileOutputCommitter extends
     }
   }
 
+  private void moveJobOutputs(FileSystem fs,
+      Path finalOutputDir, Path jobOutput) throws IOException {
+    if (fs.isFile(jobOutput)) {
+      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      if (!fs.rename(jobOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of job");
+        }
+        if (!fs.rename(jobOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of job");
+        }
+      }
+      LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+      FileStatus[] paths = fs.listStatus(jobOutput);
+      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      fs.mkdirs(finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveJobOutputs(fs, finalOutputDir, path.getPath());
+        }
+      }
+    }
+  }
+
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
@@ -163,8 +203,10 @@ public class FileOutputCommitter extends
     if (workPath != null) {
       context.progress();
       if (outputFileSystem.exists(workPath)) {
-        // Move the task outputs to their final place
-        moveTaskOutputs(context, outputFileSystem, outputPath, workPath);
+        // Move the task outputs to the current job attempt output dir
+    	  Path jobOutputPath = 
+    	      new Path(outputPath, getJobAttemptBaseDirName(context));
+        moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
         // Delete the temporary task-specific output directory
         if (!outputFileSystem.delete(workPath, true)) {
           LOG.warn("Failed to delete the temporary output" + 
@@ -271,4 +313,50 @@ public class FileOutputCommitter extends
   public Path getWorkPath() throws IOException {
     return workPath;
   }
+
+  @Override
+  public boolean isRecoverySupported() {
+    return true;
+  }
+  
+  @Override
+  public void recoverTask(TaskAttemptContext context)
+      throws IOException {
+    context.progress();
+    Path jobOutputPath = 
+        new Path(outputPath, getJobAttemptBaseDirName(context));
+    int previousAttempt =         
+        context.getConfiguration().getInt(
+            MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
+    if (previousAttempt < 0) {
+      throw new IOException ("Cannot recover task output for first attempt...");
+    }
+
+    Path pathToRecover = 
+        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+    if (outputFileSystem.exists(pathToRecover)) {
+      // Move the task outputs to their final place
+      moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+      LOG.info("Saved output of job to " + jobOutputPath);
+    }
+  }
+
+  protected static String getJobAttemptBaseDirName(JobContext context) {
+    int appAttemptId = 
+        context.getConfiguration().getInt(
+            MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+    return getJobAttemptBaseDirName(appAttemptId);
+  }
+
+  protected static String getJobAttemptBaseDirName(int appAttemptId) {
+    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
+      + appAttemptId;
+  }
+
+  protected static String getTaskAttemptBaseDirName(
+      TaskAttemptContext context) {
+	  return getJobAttemptBaseDirName(context) + Path.SEPARATOR + 
+	  FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+      "_" + context.getTaskAttemptID().toString();
+  }
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1179188&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Wed Oct  5 12:15:44 2011
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+@SuppressWarnings("unchecked")
+public class TestFileOutputCommitter extends TestCase {
+  private static Path outDir = new Path(System.getProperty("test.build.data",
+      "/tmp"), "output");
+
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+  private static String partFile = "part-m-00000";
+  private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+
+  
+  private void writeOutput(RecordWriter theRecordWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(context);
+    }
+  }
+
+  
+  public void testRecovery() throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    Path jobTempDir1 = new Path(outDir, 
+        FileOutputCommitter.getJobAttemptBaseDirName(
+            conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
+    assertTrue((new File(jobTempDir1.toString()).exists()));
+    validateContent(jobTempDir1);
+    
+    
+    
+    //now while running the second app attempt, 
+    //recover the task output from first attempt
+    Configuration conf2 = job.getConfiguration();
+    conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
+    JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
+    TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
+    FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
+    committer.setupJob(tContext2);
+    Path jobTempDir2 = new Path(outDir, 
+        FileOutputCommitter.getJobAttemptBaseDirName(
+            conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
+    assertTrue((new File(jobTempDir2.toString()).exists()));
+    
+    tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
+    committer2.recoverTask(tContext2);
+    validateContent(jobTempDir2);
+    
+    committer2.commitJob(jContext2);
+    validateContent(outDir);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  private void validateContent(Path dir) throws IOException {
+    File expectedFile = new File(new Path(dir, partFile).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = slurp(expectedFile);
+    assertEquals(output, expectedOutput.toString());
+  }
+
+  
+  public void testCommitter() throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    validateContent(outDir);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  
+  public void testAbort() throws IOException, InterruptedException {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do abort
+    committer.abortTask(tContext);
+    File expectedFile = new File(new Path(committer.getWorkPath(), partFile)
+        .toString());
+    assertFalse("task temp dir still exists", expectedFile.exists());
+
+    committer.abortJob(jContext, JobStatus.State.FAILED);
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+        .toString());
+    assertFalse("job temp dir still exists", expectedFile.exists());
+    assertEquals("Output directory not empty", 0, new File(outDir.toString())
+        .listFiles().length);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public static class FakeFileSystem extends RawLocalFileSystem {
+    public FakeFileSystem() {
+      super();
+    }
+
+    public URI getUri() {
+      return URI.create("faildel:///");
+    }
+
+    @Override
+    public boolean delete(Path p, boolean recursive) throws IOException {
+      throw new IOException("fake delete failed");
+    }
+  }
+
+  
+  public void testFailAbort() throws IOException, InterruptedException {
+    Job job = Job.getInstance();
+    Configuration conf = job.getConfiguration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
+    conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    FileOutputFormat.setOutputPath(job, outDir);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+    RecordWriter<?, ?> theRecordWriter = theOutputFormat
+        .getRecordWriter(tContext);
+    writeOutput(theRecordWriter, tContext);
+
+    // do abort
+    Throwable th = null;
+    try {
+      committer.abortTask(tContext);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    File jobTmpDir = new File(new Path(outDir,
+        FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+        conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) +
+        Path.SEPARATOR +
+        FileOutputCommitter.TEMP_DIR_NAME).toString());
+    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+    File expectedFile = new File(taskTmpDir, partFile);
+    assertTrue(expectedFile + " does not exists", expectedFile.exists());
+
+    th = null;
+    try {
+      committer.abortJob(jContext, JobStatus.State.FAILED);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    assertTrue("job temp dir does not exists", jobTmpDir.exists());
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
+    FileInputStream in = new FileInputStream(f);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+}