You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/14 03:16:31 UTC

svn commit: r1183185 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...

Author: acmurthy
Date: Fri Oct 14 01:16:30 2011
New Revision: 1183185

URL: http://svn.apache.org/viewvc?rev=1183185&view=rev
Log:
MAPREDUCE-3170. Fixed job output commit for deep hierarchies. Contributed by Hitesh Shah.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct 14 01:16:30 2011
@@ -1608,6 +1608,9 @@ Release 0.23.0 - Unreleased
 
     MAPREDUCE-2789. Complete schedulingInfo on CLI. (Eric Payne via acmurthy) 
 
+    MAPREDUCE-3170. Fixed job output commit for deep hierarchies. (Hitesh Shah
+    via acmurthy)
+
 Release 0.22.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -71,27 +71,30 @@ public class FileOutputCommitter extends
     //delete the task temp directory from the current jobtempdir
     JobConf conf = context.getJobConf();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
-    FileSystem outputFileSystem = outputPath.getFileSystem(conf);
-    Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
-        Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-    FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-    if (fileSys.exists(tmpDir)) {
-      fileSys.delete(tmpDir, true);
-    } else {
-      LOG.warn("Task temp dir could not be deleted " + tmpDir);
-    }
-    
-    //move the job output to final place
-    Path jobOutputPath = 
-        new Path(outputPath, getJobAttemptBaseDirName(context));
-    moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
-    
-    // delete the _temporary folder in the output folder
-    cleanupJob(context);
-    // check if the output-dir marking is required
-    if (shouldMarkOutputDir(context.getJobConf())) {
-      // create a _success file in the output folder
-      markOutputDirSuccessful(context);
+    if (outputPath != null) {
+      FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+          Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+      if (fileSys.exists(tmpDir)) {
+        fileSys.delete(tmpDir, true);
+      } else {
+        LOG.warn("Task temp dir could not be deleted " + tmpDir);
+      }
+
+      //move the job output to final place
+      Path jobOutputPath = 
+          new Path(outputPath, getJobAttemptBaseDirName(context));
+      moveJobOutputs(outputFileSystem, 
+          jobOutputPath, outputPath, jobOutputPath);
+
+      // delete the _temporary folder in the output folder
+      cleanupJob(context);
+      // check if the output-dir marking is required
+      if (shouldMarkOutputDir(context.getJobConf())) {
+        // create a _success file in the output folder
+        markOutputDirSuccessful(context);
+      }
     }
   }
   
@@ -109,10 +112,14 @@ public class FileOutputCommitter extends
     }
   }
 
-  private void moveJobOutputs(FileSystem fs,
+  private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
       Path finalOutputDir, Path jobOutput) throws IOException {
+    LOG.debug("Told to move job output from " + jobOutput
+        + " to " + finalOutputDir + 
+        " and orig job output path is " + origJobOutputPath);  
     if (fs.isFile(jobOutput)) {
-      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      Path finalOutputPath = 
+          getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
       if (!fs.rename(jobOutput, finalOutputPath)) {
         if (!fs.delete(finalOutputPath, true)) {
           throw new IOException("Failed to delete earlier output of job");
@@ -121,18 +128,23 @@ public class FileOutputCommitter extends
           throw new IOException("Failed to save output of job");
         }
       }
-      LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+      LOG.debug("Moved job output file from " + jobOutput + " to " + 
+          finalOutputPath);
     } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+      LOG.debug("Job output file " + jobOutput + " is a dir");      
       FileStatus[] paths = fs.listStatus(jobOutput);
-      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      Path finalOutputPath = 
+          getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
       fs.mkdirs(finalOutputPath);
+      LOG.debug("Creating dirs along job output path " + finalOutputPath);
       if (paths != null) {
         for (FileStatus path : paths) {
-          moveJobOutputs(fs, finalOutputDir, path.getPath());
+          moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
         }
       }
     }
   }
+  
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
@@ -199,8 +211,10 @@ public class FileOutputCommitter extends
   throws IOException {
     TaskAttemptID attemptId = context.getTaskAttemptID();
     context.getProgressible().progress();
+    LOG.debug("Told to move taskoutput from " + taskOutput
+        + " to " + jobOutputDir);    
     if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+      Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, 
                                           getTempTaskOutputPath(context));
       if (!fs.rename(taskOutput, finalOutputPath)) {
         if (!fs.delete(finalOutputPath, true)) {
@@ -214,10 +228,12 @@ public class FileOutputCommitter extends
       }
       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
     } else if(fs.getFileStatus(taskOutput).isDirectory()) {
+      LOG.debug("Taskoutput " + taskOutput + " is a dir");
       FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+      Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, 
 	          getTempTaskOutputPath(context));
       fs.mkdirs(finalOutputPath);
+      LOG.debug("Creating dirs along path " + finalOutputPath);
       if (paths != null) {
         for (FileStatus path : paths) {
           moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@@ -235,14 +251,16 @@ public class FileOutputCommitter extends
     }
   }
 
-  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
+  @SuppressWarnings("deprecation")
+  private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput, 
                             Path taskOutputPath) throws IOException {
-    URI taskOutputUri = taskOutput.toUri();
-    URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+    URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
+    URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
+    URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
     if (taskOutputUri == relativePath) { 
       //taskOutputPath is not a parent of taskOutput
       throw new IOException("Can not get the relative path: base = " + 
-          taskOutputPath + " child = " + taskOutput);
+          taskOutputPathUri + " child = " + taskOutputUri);
     }
     if (relativePath.getPath().length() > 0) {
       return new Path(jobOutputDir, relativePath.getPath());
@@ -325,7 +343,10 @@ public class FileOutputCommitter extends
         new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
     if (outputFileSystem.exists(pathToRecover)) {
       // Move the task outputs to their final place
-      moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+      LOG.debug("Trying to recover task from " + pathToRecover
+          + " into " + jobOutputPath);
+      moveJobOutputs(outputFileSystem, 
+          pathToRecover, jobOutputPath, pathToRecover);
       LOG.info("Saved output of job to " + jobOutputPath);
     }
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -111,32 +111,48 @@ public class FileOutputCommitter extends
    * @param context the job's context
    */
   public void commitJob(JobContext context) throws IOException {
-    //delete the task temp directory from the current jobtempdir
-    Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
-        Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
-    FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
-    if (fileSys.exists(tmpDir)) {
-      fileSys.delete(tmpDir, true);
-    } else {
-      LOG.warn("Task temp dir could not be deleted " + tmpDir);
-    }
-    
-	  //move the job output to final place
-    Path jobOutputPath = 
-        new Path(outputPath, getJobAttemptBaseDirName(context));
-	  moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
-	  
-    // delete the _temporary folder and create a _done file in the o/p folder
-    cleanupJob(context);
-    if (shouldMarkOutputDir(context.getConfiguration())) {
-      markOutputDirSuccessful(context);
+    if (outputPath != null) {
+      //delete the task temp directory from the current jobtempdir
+      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+          Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+      if (fileSys.exists(tmpDir)) {
+        fileSys.delete(tmpDir, true);
+      } else {
+        LOG.warn("Task temp dir could not be deleted " + tmpDir);
+      }
+
+      //move the job output to final place
+      Path jobOutputPath = 
+          new Path(outputPath, getJobAttemptBaseDirName(context));
+      moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
+
+      // delete the _temporary folder and create a _done file in the o/p folder
+      cleanupJob(context);
+      if (shouldMarkOutputDir(context.getConfiguration())) {
+        markOutputDirSuccessful(context);
+      }
     }
   }
 
-  private void moveJobOutputs(FileSystem fs,
+  /**
+   * Move job output to final location 
+   * @param fs Filesystem handle
+   * @param origJobOutputPath The original location of the job output
+   * Required to generate the relative path for correct moving of data. 
+   * @param finalOutputDir The final output directory to which the job output 
+   *                       needs to be moved
+   * @param jobOutput The current job output directory being moved 
+   * @throws IOException
+   */
+  private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, 
       Path finalOutputDir, Path jobOutput) throws IOException {
+    LOG.debug("Told to move job output from " + jobOutput
+        + " to " + finalOutputDir + 
+        " and orig job output path is " + origJobOutputPath);    
     if (fs.isFile(jobOutput)) {
-      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      Path finalOutputPath = 
+          getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
       if (!fs.rename(jobOutput, finalOutputPath)) {
         if (!fs.delete(finalOutputPath, true)) {
           throw new IOException("Failed to delete earlier output of job");
@@ -145,14 +161,18 @@ public class FileOutputCommitter extends
           throw new IOException("Failed to save output of job");
         }
       }
-      LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+      LOG.debug("Moved job output file from " + jobOutput + " to " + 
+          finalOutputPath);
     } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+      LOG.debug("Job output file " + jobOutput + " is a dir");
       FileStatus[] paths = fs.listStatus(jobOutput);
-      Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+      Path finalOutputPath = 
+          getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
       fs.mkdirs(finalOutputPath);
+      LOG.debug("Creating dirs along job output path " + finalOutputPath);
       if (paths != null) {
         for (FileStatus path : paths) {
-          moveJobOutputs(fs, finalOutputDir, path.getPath());
+          moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
         }
       }
     }
@@ -233,6 +253,8 @@ public class FileOutputCommitter extends
   throws IOException {
     TaskAttemptID attemptId = context.getTaskAttemptID();
     context.progress();
+    LOG.debug("Told to move taskoutput from " + taskOutput
+        + " to " + jobOutputDir);    
     if (fs.isFile(taskOutput)) {
       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
                                           workPath);
@@ -248,9 +270,11 @@ public class FileOutputCommitter extends
       }
       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
     } else if(fs.getFileStatus(taskOutput).isDirectory()) {
+      LOG.debug("Taskoutput " + taskOutput + " is a dir");
       FileStatus[] paths = fs.listStatus(taskOutput);
       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
       fs.mkdirs(finalOutputPath);
+      LOG.debug("Creating dirs along path " + finalOutputPath);
       if (paths != null) {
         for (FileStatus path : paths) {
           moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@@ -281,12 +305,17 @@ public class FileOutputCommitter extends
    * @throws IOException
    */
   private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
-                            Path taskOutputPath) throws IOException {
-    URI taskOutputUri = taskOutput.toUri();
-    URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+                            Path taskOutputPath) throws IOException {    
+    URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(), 
+        outputFileSystem.getWorkingDirectory()).toUri();
+    URI taskOutputPathUri = 
+        taskOutputPath.makeQualified(
+            outputFileSystem.getUri(),
+            outputFileSystem.getWorkingDirectory()).toUri();
+    URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
     if (taskOutputUri == relativePath) {
       throw new IOException("Can not get the relative path: base = " + 
-          taskOutputPath + " child = " + taskOutput);
+          taskOutputPathUri + " child = " + taskOutputUri);
     }
     if (relativePath.getPath().length() > 0) {
       return new Path(jobOutputDir, relativePath.getPath());
@@ -334,9 +363,12 @@ public class FileOutputCommitter extends
 
     Path pathToRecover = 
         new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+    LOG.debug("Trying to recover task from " + pathToRecover
+        + " into " + jobOutputPath);
     if (outputFileSystem.exists(pathToRecover)) {
       // Move the task outputs to their final place
-      moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+      moveJobOutputs(outputFileSystem, 
+          pathToRecover, jobOutputPath, pathToRecover);
       LOG.info("Saved output of job to " + jobOutputPath);
     }
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -25,13 +25,17 @@ import java.net.URI;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 
+
 @SuppressWarnings("unchecked")
 public class TestFileOutputCommitter extends TestCase {
   private static Path outDir = new Path(System.getProperty("test.build.data",
@@ -65,6 +69,20 @@ public class TestFileOutputCommitter ext
     }
   }
 
+  private void writeMapFileOutput(RecordWriter theRecordWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    try {
+      int key = 0;
+      for (int i = 0 ; i < 10; ++i) {
+        key = i;
+        Text val = (i%2 == 1) ? val1 : val2;
+        theRecordWriter.write(new LongWritable(key),
+            val);        
+      }
+    } finally {
+      theRecordWriter.close(null);
+    }
+  }
   
   public void testRecovery() throws Exception {
     JobConf conf = new JobConf();
@@ -91,9 +109,7 @@ public class TestFileOutputCommitter ext
         FileOutputCommitter.getJobAttemptBaseDirName(
             conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
     assertTrue((new File(jobTempDir1.toString()).exists()));
-    validateContent(jobTempDir1);
-    
-    
+    validateContent(jobTempDir1);        
     
     //now while running the second app attempt, 
     //recover the task output from first attempt
@@ -131,6 +147,29 @@ public class TestFileOutputCommitter ext
     assertEquals(output, expectedOutput.toString());
   }
 
+  private void validateMapFileOutputContent(
+      FileSystem fs, Path dir) throws IOException {
+    // map output is a directory with index and data files
+    Path expectedMapDir = new Path(dir, partFile);
+    assert(fs.getFileStatus(expectedMapDir).isDirectory());    
+    FileStatus[] files = fs.listStatus(expectedMapDir);
+    int fileCount = 0;
+    boolean dataFileFound = false; 
+    boolean indexFileFound = false; 
+    for (FileStatus f : files) {
+      if (f.isFile()) {
+        ++fileCount;
+        if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
+          indexFileFound = true;
+        }
+        else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
+          dataFileFound = true;
+        }
+      }
+    }
+    assert(fileCount > 0);
+    assert(dataFileFound && indexFileFound);
+  }
   
   public void testCommitter() throws Exception {
     JobConf conf = new JobConf();
@@ -159,6 +198,31 @@ public class TestFileOutputCommitter ext
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
+  public void testMapFileOutputCommitter() throws Exception {
+    JobConf conf = new JobConf();
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();    
+    
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
+    writeMapFileOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    validateMapFileOutputContent(FileSystem.get(conf), outDir);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
   
   public void testAbort() throws IOException, InterruptedException {
     JobConf conf = new JobConf();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -26,10 +26,13 @@ import java.net.URI;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
@@ -75,6 +78,20 @@ public class TestFileOutputCommitter ext
     }
   }
 
+  private void writeMapFileOutput(RecordWriter theRecordWriter,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    try {
+      int key = 0;
+      for (int i = 0 ; i < 10; ++i) {
+        key = i;
+        Text val = (i%2 == 1) ? val1 : val2;
+        theRecordWriter.write(new LongWritable(key),
+            val);        
+      }
+    } finally {
+      theRecordWriter.close(context);
+    }
+  }
   
   public void testRecovery() throws Exception {
     Job job = Job.getInstance();
@@ -101,9 +118,7 @@ public class TestFileOutputCommitter ext
         FileOutputCommitter.getJobAttemptBaseDirName(
             conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
     assertTrue((new File(jobTempDir1.toString()).exists()));
-    validateContent(jobTempDir1);
-    
-    
+    validateContent(jobTempDir1);    
     
     //now while running the second app attempt, 
     //recover the task output from first attempt
@@ -141,6 +156,29 @@ public class TestFileOutputCommitter ext
     assertEquals(output, expectedOutput.toString());
   }
 
+  private void validateMapFileOutputContent(
+      FileSystem fs, Path dir) throws IOException {
+    // map output is a directory with index and data files
+    Path expectedMapDir = new Path(dir, partFile);
+    assert(fs.getFileStatus(expectedMapDir).isDirectory());    
+    FileStatus[] files = fs.listStatus(expectedMapDir);
+    int fileCount = 0;
+    boolean dataFileFound = false; 
+    boolean indexFileFound = false; 
+    for (FileStatus f : files) {
+      if (f.isFile()) {
+        ++fileCount;
+        if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
+          indexFileFound = true;
+        }
+        else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
+          dataFileFound = true;
+        }
+      }
+    }
+    assert(fileCount > 0);
+    assert(dataFileFound && indexFileFound);
+  }
   
   public void testCommitter() throws Exception {
     Job job = Job.getInstance();
@@ -169,6 +207,32 @@ public class TestFileOutputCommitter ext
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
 
+  public void testMapFileOutputCommitter() throws Exception {
+    Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());    
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+
+    // write output
+    MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
+    RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+    writeMapFileOutput(theRecordWriter, tContext);
+
+    // do commit
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+
+    // validate output
+    validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
   
   public void testAbort() throws IOException, InterruptedException {
     Job job = Job.getInstance();