You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/06 23:17:18 UTC
svn commit: r1241218 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/o...
Author: vinodkv
Date: Mon Feb 6 22:17:18 2012
New Revision: 1241218
URL: http://svn.apache.org/viewvc?rev=1241218&view=rev
Log:
MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps but no reduces. Contributed by Robert Joseph Evans.
svn merge --ignore-ancestry -c 1241217 ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Feb 6 22:17:18 2012
@@ -670,6 +670,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3813. Added a cache for resolved racks. (vinodkv via acmurthy)
+ MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps
+ but no reduces. (Robert Joseph Evans via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Mon Feb 6 22:17:18 2012
@@ -85,18 +85,21 @@ public class FileOutputCommitter extends
*/
@Private
Path getJobAttemptPath(JobContext context) {
- return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
- .getJobAttemptPath(context, getOutputPath(context));
+ Path out = getOutputPath(context);
+ return out == null ? null :
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ .getJobAttemptPath(context, out);
}
@Private
Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
- return getTaskAttemptPath(context, getOutputPath(context));
+ Path out = getOutputPath(context);
+ return out == null ? null : getTaskAttemptPath(context, out);
}
private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
- if(workPath == null) {
+ if(workPath == null && out != null) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getTaskAttemptPath(context, out);
}
@@ -110,14 +113,17 @@ public class FileOutputCommitter extends
* @return the path where the output of a committed task is stored until
* the entire job is committed.
*/
+ @Private
Path getCommittedTaskPath(TaskAttemptContext context) {
- return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
- .getCommittedTaskPath(context, getOutputPath(context));
+ Path out = getOutputPath(context);
+ return out == null ? null :
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ .getCommittedTaskPath(context, out);
}
public Path getWorkPath(TaskAttemptContext context, Path outputPath)
throws IOException {
- return getTaskAttemptPath(context, outputPath);
+ return outputPath == null ? null : getTaskAttemptPath(context, outputPath);
}
@Override
@@ -156,6 +162,7 @@ public class FileOutputCommitter extends
getWrapped(context).abortJob(context, state);
}
+ @Override
public void setupTask(TaskAttemptContext context) throws IOException {
getWrapped(context).setupTask(context);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Mon Feb 6 22:17:18 2012
@@ -495,36 +495,40 @@ public class FileOutputCommitter extends
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
- context.progress();
- TaskAttemptID attemptId = context.getTaskAttemptID();
- int previousAttempt = getAppAttemptId(context) - 1;
- if (previousAttempt < 0) {
- throw new IOException ("Cannot recover task output for first attempt...");
- }
-
- Path committedTaskPath = getCommittedTaskPath(context);
- Path previousCommittedTaskPath = getCommittedTaskPath(
- previousAttempt, context);
- FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
-
- LOG.debug("Trying to recover task from " + previousCommittedTaskPath
- + " into " + committedTaskPath);
- if (fs.exists(previousCommittedTaskPath)) {
- if(fs.exists(committedTaskPath)) {
- if(!fs.delete(committedTaskPath, true)) {
- throw new IOException("Could not delete "+committedTaskPath);
- }
+ if(hasOutputPath()) {
+ context.progress();
+ TaskAttemptID attemptId = context.getTaskAttemptID();
+ int previousAttempt = getAppAttemptId(context) - 1;
+ if (previousAttempt < 0) {
+ throw new IOException ("Cannot recover task output for first attempt...");
}
- //Rename can fail if the parent directory does not yet exist.
- Path committedParent = committedTaskPath.getParent();
- fs.mkdirs(committedParent);
- if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
- throw new IOException("Could not rename " + previousCommittedTaskPath +
- " to " + committedTaskPath);
+
+ Path committedTaskPath = getCommittedTaskPath(context);
+ Path previousCommittedTaskPath = getCommittedTaskPath(
+ previousAttempt, context);
+ FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+
+ LOG.debug("Trying to recover task from " + previousCommittedTaskPath
+ + " into " + committedTaskPath);
+ if (fs.exists(previousCommittedTaskPath)) {
+ if(fs.exists(committedTaskPath)) {
+ if(!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete "+committedTaskPath);
+ }
+ }
+ //Rename can fail if the parent directory does not yet exist.
+ Path committedParent = committedTaskPath.getParent();
+ fs.mkdirs(committedParent);
+ if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + previousCommittedTaskPath +
+ " to " + committedTaskPath);
+ }
+ LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
+ } else {
+ LOG.warn(attemptId+" had no output to recover.");
}
- LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
} else {
- LOG.warn(attemptId+" had no output to recover.");
+ LOG.warn("Output Path is null in recoverTask()");
}
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1241218&r1=1241217&r2=1241218&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Mon Feb 6 22:17:18 2012
@@ -104,7 +104,9 @@ public class TestFileOutputCommitter ext
writeOutput(theRecordWriter, tContext);
// do commit
- committer.commitTask(tContext);
+ if(committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
File jtd1 = new File(jobTempDir1.toUri().getPath());
assertTrue(jtd1.exists());
@@ -188,7 +190,9 @@ public class TestFileOutputCommitter ext
writeOutput(theRecordWriter, tContext);
// do commit
- committer.commitTask(tContext);
+ if(committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
committer.commitJob(jContext);
// validate output
@@ -214,7 +218,9 @@ public class TestFileOutputCommitter ext
writeMapFileOutput(theRecordWriter, tContext);
// do commit
- committer.commitTask(tContext);
+ if(committer.needsTaskCommit(tContext)) {
+ committer.commitTask(tContext);
+ }
committer.commitJob(jContext);
// validate output
@@ -222,6 +228,28 @@ public class TestFileOutputCommitter ext
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testMapOnlyNoOutput() throws Exception {
+ JobConf conf = new JobConf();
+ //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ if(committer.needsTaskCommit(tContext)) {
+ // do commit
+ committer.commitTask(tContext);
+ }
+ committer.commitJob(jContext);
+
+ // validate output
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
public void testAbort() throws IOException, InterruptedException {
JobConf conf = new JobConf();
FileOutputFormat.setOutputPath(conf, outDir);