You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/11 20:27:51 UTC

svn commit: r1182008 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org...

Author: acmurthy
Date: Tue Oct 11 18:27:51 2011
New Revision: 1182008

URL: http://svn.apache.org/viewvc?rev=1182008&view=rev
Log:
MAPREDUCE-3148. Ported MAPREDUCE-2702 to old mapred api for aiding task recovery. 

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Oct 11 18:27:51 2011
@@ -362,6 +362,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-3161. Improved some javadocs and fixed some typos in
     YARN. (Todd Lipcon via vinodkv)
 
+    MAPREDUCE-3148. Ported MAPREDUCE-2702 to old mapred api for aiding task
+    recovery. (acmurthy) 
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Tue Oct 11 18:27:51 2011
@@ -160,7 +160,10 @@
      </Match>
      <Match>
        <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
+       <Or>
        <Method name="commitJob" />
+       <Method name="recoverTask" />
+       </Or>
        <Bug pattern="NM_WRONG_PACKAGE" />
      </Match>
      <Match>
@@ -169,6 +172,7 @@
        <Method name="abortJob" />
        <Method name="commitJob" />
        <Method name="cleanupJob" />
+       <Method name="recoverTask" />
        </Or>
        <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
      </Match>

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -38,7 +38,8 @@ public class FileOutputCommitter extends
 
   public static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.FileOutputCommitter");
-/**
+  
+  /**
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";
@@ -50,7 +51,9 @@ public class FileOutputCommitter extends
     JobConf conf = context.getJobConf();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
     if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      Path tmpDir = 
+          new Path(outputPath, getJobAttemptBaseDirName(context) + 
+              Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(conf);
       if (!fileSys.mkdirs(tmpDir)) {
         LOG.error("Mkdirs failed to create " + tmpDir.toString());
@@ -65,6 +68,24 @@ public class FileOutputCommitter extends
   }
   
   public void commitJob(JobContext context) throws IOException {
+    //delete the task temp directory from the current jobtempdir
+    JobConf conf = context.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+    Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+        Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+    FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+    if (fileSys.exists(tmpDir)) {
+      fileSys.delete(tmpDir, true);
+    } else {
+      LOG.warn("Task temp dir could not be deleted " + tmpDir);
+    }
+    
+    //move the job output to final place
+    Path jobOutputPath = 
+        new Path(outputPath, getJobAttemptBaseDirName(context));
+    moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
+    
     // delete the _temporary folder in the output folder
     cleanupJob(context);
     // check if the output-dir marking is required
@@ -88,6 +109,30 @@ public class FileOutputCommitter extends
     }
   }
 
+  private void moveJobOutputs(FileSystem fs,
+      Path finalOutputDir, Path jobOutput) throws IOException {
+    if (fs.isFile(jobOutput)) {
+      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      if (!fs.rename(jobOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of job");
+        }
+        if (!fs.rename(jobOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of job");
+        }
+      }
+      LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+      FileStatus[] paths = fs.listStatus(jobOutput);
+      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      fs.mkdirs(finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveJobOutputs(fs, finalOutputDir, path.getPath());
+        }
+      }
+    }
+  }
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
@@ -128,9 +173,14 @@ public class FileOutputCommitter extends
       FileSystem fs = taskOutputPath.getFileSystem(job);
       context.getProgressible().progress();
       if (fs.exists(taskOutputPath)) {
-        Path jobOutputPath = taskOutputPath.getParent().getParent();
-        // Move the task outputs to their final place
-        moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
+        // Move the task outputs to the current job attempt output dir
+        JobConf conf = context.getJobConf();
+        Path outputPath = FileOutputFormat.getOutputPath(conf);
+        FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+        Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
+        moveTaskOutputs(context, outputFileSystem, jobOutputPath, 
+            taskOutputPath);
+
         // Delete the temporary task-specific output directory
         if (!fs.delete(taskOutputPath, true)) {
           LOG.info("Failed to delete the temporary output" + 
@@ -189,7 +239,8 @@ public class FileOutputCommitter extends
                             Path taskOutputPath) throws IOException {
     URI taskOutputUri = taskOutput.toUri();
     URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
-    if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+    if (taskOutputUri == relativePath) { 
+      //taskOutputPath is not a parent of taskOutput
       throw new IOException("Can not get the relative path: base = " + 
           taskOutputPath + " child = " + taskOutput);
     }
@@ -216,7 +267,8 @@ public class FileOutputCommitter extends
     return false;
   }
 
-  Path getTempTaskOutputPath(TaskAttemptContext taskContext) throws IOException {
+  Path getTempTaskOutputPath(TaskAttemptContext taskContext) 
+      throws IOException {
     JobConf conf = taskContext.getJobConf();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
     if (outputPath != null) {
@@ -247,4 +299,60 @@ public class FileOutputCommitter extends
     }
     return taskTmpDir;
   }
+  
+  @Override
+  public boolean isRecoverySupported() {
+    return true;
+  }
+  
+  @Override
+  public void recoverTask(TaskAttemptContext context)
+      throws IOException {
+    Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
+    context.progress();
+    Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
+    int previousAttempt =         
+        context.getConfiguration().getInt(
+            MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
+    if (previousAttempt < 0) {
+      LOG.warn("Cannot recover task output for first attempt...");
+      return;
+    }
+
+    FileSystem outputFileSystem = 
+        outputPath.getFileSystem(context.getJobConf());
+    Path pathToRecover = 
+        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+    if (outputFileSystem.exists(pathToRecover)) {
+      // Move the task outputs to their final place
+      moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+      LOG.info("Saved output of job to " + jobOutputPath);
+    }
+  }
+
+  protected static String getJobAttemptBaseDirName(JobContext context) {
+    int appAttemptId = 
+        context.getJobConf().getInt(
+            MRConstants.APPLICATION_ATTEMPT_ID, 0);
+    return getJobAttemptBaseDirName(appAttemptId);
+  }
+
+  protected static String getJobTempDirName(TaskAttemptContext context) {
+    int appAttemptId = 
+        context.getJobConf().getInt(
+            MRConstants.APPLICATION_ATTEMPT_ID, 0);
+    return getJobAttemptBaseDirName(appAttemptId);
+  }
+
+  protected static String getJobAttemptBaseDirName(int appAttemptId) {
+    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
+      + appAttemptId;
+  }
+
+  protected static String getTaskAttemptBaseDirName(
+      TaskAttemptContext context) {
+    return getJobTempDirName(context) + Path.SEPARATOR + 
+      FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+      "_" + context.getTaskAttemptID().toString();
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java Tue Oct 11 18:27:51 2011
@@ -60,4 +60,9 @@ public interface MRConstants {
   
   /** Used in MRv1, mostly in TaskTracker code **/
   public static final String WORKDIR = "work";
+
+  /** Used on by MRv2 */
+  public static final String APPLICATION_ATTEMPT_ID =
+      "mapreduce.job.application.attempt.id";
+
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -152,6 +152,33 @@ public abstract class OutputCommitter 
    * is a bridge between the two.
    */
   @Override
+  public boolean isRecoverySupported() {
+    return false;
+  }
+
+  /**
+   * Recover the task output. 
+   * 
+   * The retry-count for the job will be passed via the 
+   * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
+   * {@link TaskAttemptContext#getConfiguration()} for the 
+   * <code>OutputCommitter</code>.
+   * 
+   * If an exception is thrown the task will be attempted again. 
+   * 
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException
+   */
+  public void recoverTask(TaskAttemptContext taskContext) 
+  throws IOException {
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
   public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
                              ) throws IOException {
     setupJob((JobContext) jobContext);
@@ -246,4 +273,17 @@ public abstract class OutputCommitter 
                  ) throws IOException {
     abortTask((TaskAttemptContext) taskContext);
   }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final 
+  void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+      ) throws IOException {
+    recoverTask((TaskAttemptContext) taskContext);
+  }
+
 }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1182008&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+@SuppressWarnings("unchecked")
+public class TestFileOutputCommitter extends TestCase {
+  private static Path outDir = new Path(System.getProperty("test.build.data",
+      "/tmp"), "output");
+
+  // A random task attempt id for testing.
+  private static String attempt = "attempt_200707121733_0001_m_000000_0";
+  private static String partFile = "part-00000";
+  private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+  private Text key1 = new Text("key1");
+  private Text key2 = new Text("key2");
+  private Text val1 = new Text("val1");
+  private Text val2 = new Text("val2");
+
+  
+  private void writeOutput(RecordWriter theRecordWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    NullWritable nullWritable = NullWritable.get();
+
+    try {
+      theRecordWriter.write(key1, val1);
+      theRecordWriter.write(null, nullWritable);
+      theRecordWriter.write(null, val1);
+      theRecordWriter.write(nullWritable, val2);
+      theRecordWriter.write(key2, nullWritable);
+      theRecordWriter.write(key1, null);
+      theRecordWriter.write(null, null);
+      theRecordWriter.write(key2, val2);
+    } finally {
+      theRecordWriter.close(null);
+    }
+  }
+
+  
+  public void testRecovery() throws Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = 
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    Path jobTempDir1 = new Path(outDir, 
+        FileOutputCommitter.getJobAttemptBaseDirName(
+            conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
+    assertTrue((new File(jobTempDir1.toString()).exists()));
+    validateContent(jobTempDir1);
+    
+    
+    
+    //now while running the second app attempt, 
+    //recover the task output from first attempt
+    JobConf conf2 = new JobConf(conf);
+    conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
+    JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
+    TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
+    FileOutputCommitter committer2 = new FileOutputCommitter();
+    committer.setupJob(jContext2);
+    Path jobTempDir2 = new Path(outDir, 
+        FileOutputCommitter.getJobAttemptBaseDirName(
+            conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
+    assertTrue((new File(jobTempDir2.toString()).exists()));
+    
+    tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
+    committer2.recoverTask(tContext2);
+    validateContent(jobTempDir2);
+    
+    committer2.commitJob(jContext2);
+    validateContent(outDir);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  private void validateContent(Path dir) throws IOException {
+    File expectedFile = new File(new Path(dir, partFile).toString());
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(key1).append('\t').append(val1).append("\n");
+    expectedOutput.append(val1).append("\n");
+    expectedOutput.append(val2).append("\n");
+    expectedOutput.append(key2).append("\n");
+    expectedOutput.append(key1).append("\n");
+    expectedOutput.append(key2).append('\t').append(val2).append("\n");
+    String output = slurp(expectedFile);
+    assertEquals(output, expectedOutput.toString());
+  }
+
+  
+  public void testCommitter() throws Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = 
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    validateContent(outDir);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  
+  public void testAbort() throws IOException, InterruptedException {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    TextOutputFormat theOutputFormat = new TextOutputFormat();
+    RecordWriter theRecordWriter = 
+        theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do abort
+    committer.abortTask(tContext);
+    FileSystem outputFileSystem = outDir.getFileSystem(conf);
+    Path workPath = new Path(outDir,
+        committer.getTaskAttemptBaseDirName(tContext))
+        .makeQualified(outputFileSystem);
+    File expectedFile = new File(new Path(workPath, partFile)
+        .toString());
+    assertFalse("task temp dir still exists", expectedFile.exists());
+
+    committer.abortJob(jContext, JobStatus.State.FAILED);
+    expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+        .toString());
+    assertFalse("job temp dir still exists", expectedFile.exists());
+    assertEquals("Output directory not empty", 0, new File(outDir.toString())
+        .listFiles().length);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public static class FakeFileSystem extends RawLocalFileSystem {
+    public FakeFileSystem() {
+      super();
+    }
+
+    public URI getUri() {
+      return URI.create("faildel:///");
+    }
+
+    @Override
+    public boolean delete(Path p, boolean recursive) throws IOException {
+      throw new IOException("fake delete failed");
+    }
+  }
+
+  
+  public void testFailAbort() throws IOException, InterruptedException {
+    JobConf conf = new JobConf();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
+    conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();
+
+    // do setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    File jobTmpDir = new File(new Path(outDir,
+        FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+        conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0) +
+        Path.SEPARATOR +
+        FileOutputCommitter.TEMP_DIR_NAME).toString());
+    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+    File expectedFile = new File(taskTmpDir, partFile);
+    TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+    RecordWriter<?, ?> theRecordWriter = 
+        theOutputFormat.getRecordWriter(null, conf, 
+            expectedFile.getAbsolutePath(), null);
+    writeOutput(theRecordWriter, tContext);
+
+    // do abort
+    Throwable th = null;
+    try {
+      committer.abortTask(tContext);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    assertTrue(expectedFile + " does not exists", expectedFile.exists());
+
+    th = null;
+    try {
+      committer.abortJob(jContext, JobStatus.State.FAILED);
+    } catch (IOException ie) {
+      th = ie;
+    }
+    assertNotNull(th);
+    assertTrue(th instanceof IOException);
+    assertTrue(th.getMessage().contains("fake delete failed"));
+    assertTrue("job temp dir does not exists", jobTmpDir.exists());
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public static String slurp(File f) throws IOException {
+    int len = (int) f.length();
+    byte[] buf = new byte[len];
+    FileInputStream in = new FileInputStream(f);
+    String contents = null;
+    try {
+      in.read(buf, 0, len);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -175,7 +175,12 @@ public class TestFileOutputCommitter ext
     // do setup
     committer.setupJob(jContext);
     committer.setupTask(tContext);
+    
     String file = "test.txt";
+    String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
+    File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
+    File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
+    File expectedFile = new File(taskTmpDir, file);
 
     // A reporter that does nothing
     Reporter reporter = Reporter.NULL;
@@ -183,7 +188,7 @@ public class TestFileOutputCommitter ext
     FileSystem localFs = new FakeFileSystem();
     TextOutputFormat theOutputFormat = new TextOutputFormat();
     RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
-        job, file, reporter);
+        job, expectedFile.getAbsolutePath(), reporter);
     writeOutput(theRecordWriter, reporter);
 
     // do abort
@@ -196,10 +201,6 @@ public class TestFileOutputCommitter ext
     assertNotNull(th);
     assertTrue(th instanceof IOException);
     assertTrue(th.getMessage().contains("fake delete failed"));
-    File jobTmpDir = new File(new Path(outDir,
-        FileOutputCommitter.TEMP_DIR_NAME).toString());
-    File taskTmpDir = new File(jobTmpDir, "_" + taskID);
-    File expectedFile = new File(taskTmpDir, file);
     assertTrue(expectedFile + " does not exists", expectedFile.exists());
 
     th = null;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -188,9 +188,9 @@ public class TestFileOutputCommitter ext
     assertNotNull(th);
     assertTrue(th instanceof IOException);
     assertTrue(th.getMessage().contains("fake delete failed"));
-    String filename = committer.getTaskAttemptBaseDirName(tContext);
+    String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
     File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
-    File taskTmpDir = new File(outDir.toString(), filename);
+    File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
     File expectedFile = new File(taskTmpDir, partFile);
     assertTrue(expectedFile + " does not exists", expectedFile.exists());