You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/06 23:17:18 UTC

svn commit: r1241218 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/o...

Author: vinodkv
Date: Mon Feb  6 22:17:18 2012
New Revision: 1241218

URL: http://svn.apache.org/viewvc?rev=1241218&view=rev
Log:
MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps but no reduces. Contributed by Robert Joseph Evans.
svn merge --ignore-ancestry -c 1241217 ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Feb  6 22:17:18 2012
@@ -670,6 +670,9 @@ Release 0.23.1 - Unreleased
 
     MAPREDUCE-3813. Added a cache for resolved racks. (vinodkv via acmurthy)   
 
+    MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps
+    but no reduces. (Robert Joseph Evans via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Mon Feb  6 22:17:18 2012
@@ -85,18 +85,21 @@ public class FileOutputCommitter extends
    */
   @Private
   Path getJobAttemptPath(JobContext context) {
-    return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-        .getJobAttemptPath(context, getOutputPath(context));
+    Path out = getOutputPath(context);
+    return out == null ? null : 
+      org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+        .getJobAttemptPath(context, out);
   }
 
   @Private
   Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
-    return getTaskAttemptPath(context, getOutputPath(context));
+    Path out = getOutputPath(context);
+    return out == null ? null : getTaskAttemptPath(context, out);
   }
 
   private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
     Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
-    if(workPath == null) {
+    if(workPath == null && out != null) {
       return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
       .getTaskAttemptPath(context, out);
     }
@@ -110,14 +113,17 @@ public class FileOutputCommitter extends
    * @return the path where the output of a committed task is stored until
    * the entire job is committed.
    */
+  @Private
   Path getCommittedTaskPath(TaskAttemptContext context) {
-    return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-        .getCommittedTaskPath(context, getOutputPath(context));
+    Path out = getOutputPath(context);
+    return out == null ? null : 
+      org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+        .getCommittedTaskPath(context, out);
   }
 
   public Path getWorkPath(TaskAttemptContext context, Path outputPath) 
   throws IOException {
-    return getTaskAttemptPath(context, outputPath);
+    return outputPath == null ? null : getTaskAttemptPath(context, outputPath);
   }
   
   @Override
@@ -156,6 +162,7 @@ public class FileOutputCommitter extends
     getWrapped(context).abortJob(context, state);
   }
   
+  @Override
   public void setupTask(TaskAttemptContext context) throws IOException {
     getWrapped(context).setupTask(context);
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Mon Feb  6 22:17:18 2012
@@ -495,36 +495,40 @@ public class FileOutputCommitter extends
   @Override
   public void recoverTask(TaskAttemptContext context)
       throws IOException {
-    context.progress();
-    TaskAttemptID attemptId = context.getTaskAttemptID();
-    int previousAttempt = getAppAttemptId(context) - 1;
-    if (previousAttempt < 0) {
-      throw new IOException ("Cannot recover task output for first attempt...");
-    }
-    
-    Path committedTaskPath = getCommittedTaskPath(context);
-    Path previousCommittedTaskPath = getCommittedTaskPath(
-        previousAttempt, context);
-    FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
-    
-    LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
-        + " into " + committedTaskPath);
-    if (fs.exists(previousCommittedTaskPath)) {
-      if(fs.exists(committedTaskPath)) {
-        if(!fs.delete(committedTaskPath, true)) {
-          throw new IOException("Could not delete "+committedTaskPath);
-        }
+    if(hasOutputPath()) {
+      context.progress();
+      TaskAttemptID attemptId = context.getTaskAttemptID();
+      int previousAttempt = getAppAttemptId(context) - 1;
+      if (previousAttempt < 0) {
+        throw new IOException ("Cannot recover task output for first attempt...");
       }
-      //Rename can fail if the parent directory does not yet exist.
-      Path committedParent = committedTaskPath.getParent();
-      fs.mkdirs(committedParent);
-      if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
-        throw new IOException("Could not rename " + previousCommittedTaskPath +
-            " to " + committedTaskPath);
+
+      Path committedTaskPath = getCommittedTaskPath(context);
+      Path previousCommittedTaskPath = getCommittedTaskPath(
+          previousAttempt, context);
+      FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+
+      LOG.debug("Trying to recover task from " + previousCommittedTaskPath 
+          + " into " + committedTaskPath);
+      if (fs.exists(previousCommittedTaskPath)) {
+        if(fs.exists(committedTaskPath)) {
+          if(!fs.delete(committedTaskPath, true)) {
+            throw new IOException("Could not delete "+committedTaskPath);
+          }
+        }
+        //Rename can fail if the parent directory does not yet exist.
+        Path committedParent = committedTaskPath.getParent();
+        fs.mkdirs(committedParent);
+        if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+          throw new IOException("Could not rename " + previousCommittedTaskPath +
+              " to " + committedTaskPath);
+        }
+        LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
+      } else {
+        LOG.warn(attemptId+" had no output to recover.");
       }
-      LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
     } else {
-      LOG.warn(attemptId+" had no output to recover.");
+      LOG.warn("Output Path is null in recoverTask()");
     }
   }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Mon Feb  6 22:17:18 2012
@@ -104,7 +104,9 @@ public class TestFileOutputCommitter ext
     writeOutput(theRecordWriter, tContext);
 
     // do commit
-    committer.commitTask(tContext);
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
     Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
     File jtd1 = new File(jobTempDir1.toUri().getPath());
     assertTrue(jtd1.exists());
@@ -188,7 +190,9 @@ public class TestFileOutputCommitter ext
     writeOutput(theRecordWriter, tContext);
 
     // do commit
-    committer.commitTask(tContext);
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
     committer.commitJob(jContext);
 
     // validate output
@@ -214,7 +218,9 @@ public class TestFileOutputCommitter ext
     writeMapFileOutput(theRecordWriter, tContext);
 
     // do commit
-    committer.commitTask(tContext);
+    if(committer.needsTaskCommit(tContext)) {
+      committer.commitTask(tContext);
+    }
     committer.commitJob(jContext);
 
     // validate output
@@ -222,6 +228,28 @@ public class TestFileOutputCommitter ext
     FileUtil.fullyDelete(new File(outDir.toString()));
   }
   
+  public void testMapOnlyNoOutput() throws Exception {
+    JobConf conf = new JobConf();
+    //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
+    conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+    JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+    FileOutputCommitter committer = new FileOutputCommitter();    
+    
+    // setup
+    committer.setupJob(jContext);
+    committer.setupTask(tContext);
+    
+    if(committer.needsTaskCommit(tContext)) {
+      // do commit
+      committer.commitTask(tContext);
+    }
+    committer.commitJob(jContext);
+
+    // validate output
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+  
   public void testAbort() throws IOException, InterruptedException {
     JobConf conf = new JobConf();
     FileOutputFormat.setOutputPath(conf, outDir);