You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/04 01:06:25 UTC
svn commit: r1240414 [1/2] - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-cli...
Author: vinodkv
Date: Sat Feb 4 00:06:24 2012
New Revision: 1240414
URL: http://svn.apache.org/viewvc?rev=1240414&view=rev
Log:
MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task output is recovered and thus reduce the unnecessarily bloated recovery time. Contributed by Robert Joseph Evans.
svn merge --ignore-ancestry -c 1240413 ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Sat Feb 4 00:06:24 2012
@@ -628,6 +628,10 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3727. jobtoken location property in jobconf refers to wrong
jobtoken file (tucu)
+ MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task
+ output is recovered and thus reduce the unnecessarily bloated recovery
+ time. (Robert Joseph Evans via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Feb 4 00:06:24 2012
@@ -559,6 +559,7 @@ public abstract class TaskImpl implement
}
private void internalError(TaskEventType type) {
+ LOG.error("Invalid event " + type + " on Task " + this.taskId);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.taskId.getJobId(), "Invalid event " + type +
" on Task " + this.taskId));
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Sat Feb 4 00:06:24 2012
@@ -103,6 +103,7 @@ public class LocalContainerAllocator ext
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
throw new YarnException("Could not contact RM after " +
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Sat Feb 4 00:06:24 2012
@@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -358,16 +360,24 @@ public class RecoveryService extends Com
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
- try {
- committer.recoverTask(taskContext);
+ try {
+ TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+ int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
+ if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+ committer.recoverTask(taskContext);
+ LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+ } else {
+ LOG.info("Will not try to recover output for "
+ + taskContext.getTaskAttemptID());
+ }
} catch (IOException e) {
+ LOG.error("Caught an exception while trying to recover task "+aId, e);
actualHandler.handle(new JobDiagnosticsUpdateEvent(
aId.getTaskId().getJobId(), "Error in recovering task output " +
e.getMessage()));
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
- LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
// send the done event
LOG.info("Sending done event to " + aId);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Feb 4 00:06:24 2012
@@ -543,6 +543,7 @@ public class RMContainerAllocator extend
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
+ LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
throw new YarnException("Could not contact RM after " +
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Sat Feb 4 00:06:24 2012
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
@@ -112,7 +113,7 @@ public class TestRecovery {
Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState());
- //send the fail signal to the 1st map task attempt
+ //send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
@@ -193,7 +194,7 @@ public class TestRecovery {
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
- //send the done signal to the 2nd map task
+ //send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
@@ -349,6 +350,151 @@ public class TestRecovery {
validateOutput();
}
+ @Test
+ public void testOutputRecoveryMapsOnly() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task mapTask2 = it.next();
+ Task reduceTask1 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+
+ // write output corresponding to map1 (This is just to validate that it is
+ //no included in the output)
+ writeBadOutput(task1Attempt1, conf);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ //stop the app before the job completes.
+ app.stop();
+
+ //rerun
+ //in rerun the map will be recovered from previous run
+ app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ mapTask2 = it.next();
+ reduceTask1 = it.next();
+
+ // map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port after recovery
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ app.waitForState(mapTask2, TaskState.RUNNING);
+
+ TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task2Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task2Attempt1.getShufflePort());
+
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+
+ // write output corresponding to reduce1
+ writeOutput(reduce1Attempt1, conf);
+
+ //send the done signal to the 1st reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first reduce task to complete
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ validateOutput();
+ }
+
+ private void writeBadOutput(TaskAttempt attempt, Configuration conf)
+ throws Exception {
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attempt.getID()));
+
+ TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter = theOutputFormat
+ .getRecordWriter(tContext);
+
+ NullWritable nullWritable = NullWritable.get();
+ try {
+ theRecordWriter.write(key2, val2);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val2);
+ theRecordWriter.write(nullWritable, val1);
+ theRecordWriter.write(key1, nullWritable);
+ theRecordWriter.write(key2, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key1, val1);
+ } finally {
+ theRecordWriter.close(tContext);
+ }
+
+ OutputFormat outputFormat = ReflectionUtils.newInstance(
+ tContext.getOutputFormatClass(), conf);
+ OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+ committer.commitTask(tContext);
+}
+
+
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Sat Feb 4 00:06:24 2012
@@ -19,14 +19,12 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/** An {@link OutputCommitter} that commits files specified
@@ -42,280 +40,140 @@ public class FileOutputCommitter extends
/**
* Temporary directory name
*/
- public static final String TEMP_DIR_NAME = "_temporary";
- public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
- static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
- "mapreduce.fileoutputcommitter.marksuccessfuljobs";
-
- public void setupJob(JobContext context) throws IOException {
+ public static final String TEMP_DIR_NAME =
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
+ public static final String SUCCEEDED_FILE_NAME =
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME;
+ static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
+
+ private static Path getOutputPath(JobContext context) {
+ JobConf conf = context.getJobConf();
+ return FileOutputFormat.getOutputPath(conf);
+ }
+
+ private static Path getOutputPath(TaskAttemptContext context) {
JobConf conf = context.getJobConf();
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- if (outputPath != null) {
- Path tmpDir =
- new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(conf);
- if (!fileSys.mkdirs(tmpDir)) {
- LOG.error("Mkdirs failed to create " + tmpDir.toString());
- }
+ return FileOutputFormat.getOutputPath(conf);
+ }
+
+ private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null;
+
+ private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ getWrapped(JobContext context) throws IOException {
+ if(wrapped == null) {
+ wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
+ getOutputPath(context), context);
}
+ return wrapped;
}
-
- // True if the job requires output.dir marked on successful job.
- // Note that by default it is set to true.
- private boolean shouldMarkOutputDir(JobConf conf) {
- return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+
+ private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ getWrapped(TaskAttemptContext context) throws IOException {
+ if(wrapped == null) {
+ wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
+ getOutputPath(context), context);
+ }
+ return wrapped;
}
- public void commitJob(JobContext context) throws IOException {
- //delete the task temp directory from the current jobtempdir
- JobConf conf = context.getJobConf();
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- if (outputPath != null) {
- FileSystem outputFileSystem = outputPath.getFileSystem(conf);
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- } else {
- LOG.warn("Task temp dir could not be deleted " + tmpDir);
- }
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param context the context of the job. This is used to get the
+ * application attempt id.
+ * @return the path to store job attempt data.
+ */
+ @Private
+ Path getJobAttemptPath(JobContext context) {
+ return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ .getJobAttemptPath(context, getOutputPath(context));
+ }
- //move the job output to final place
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveJobOutputs(outputFileSystem,
- jobOutputPath, outputPath, jobOutputPath);
+ @Private
+ Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
+ return getTaskAttemptPath(context, getOutputPath(context));
+ }
- // delete the _temporary folder in the output folder
- cleanupJob(context);
- // check if the output-dir marking is required
- if (shouldMarkOutputDir(context.getJobConf())) {
- // create a _success file in the output folder
- markOutputDirSuccessful(context);
- }
+ private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
+ Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
+ if(workPath == null) {
+ return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ .getTaskAttemptPath(context, out);
}
+ return workPath;
}
- // Create a _success file in the job's output folder
- private void markOutputDirSuccessful(JobContext context) throws IOException {
- JobConf conf = context.getJobConf();
- // get the o/p path
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- if (outputPath != null) {
- // get the filesys
- FileSystem fileSys = outputPath.getFileSystem(conf);
- // create a file in the output folder to mark the job completion
- Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
- fileSys.create(filePath).close();
- }
+ /**
+ * Compute the path where the output of a committed task is stored until
+ * the entire job is committed.
+ * @param context the context of the task attempt
+ * @return the path where the output of a committed task is stored until
+ * the entire job is committed.
+ */
+ Path getCommittedTaskPath(TaskAttemptContext context) {
+ return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+ .getCommittedTaskPath(context, getOutputPath(context));
}
- private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
- Path finalOutputDir, Path jobOutput) throws IOException {
- LOG.debug("Told to move job output from " + jobOutput
- + " to " + finalOutputDir +
- " and orig job output path is " + origJobOutputPath);
- if (fs.isFile(jobOutput)) {
- Path finalOutputPath =
- getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
- if (!fs.rename(jobOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of job");
- }
- if (!fs.rename(jobOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of job");
- }
- }
- LOG.debug("Moved job output file from " + jobOutput + " to " +
- finalOutputPath);
- } else if (fs.getFileStatus(jobOutput).isDirectory()) {
- LOG.debug("Job output file " + jobOutput + " is a dir");
- FileStatus[] paths = fs.listStatus(jobOutput);
- Path finalOutputPath =
- getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
- fs.mkdirs(finalOutputPath);
- LOG.debug("Creating dirs along job output path " + finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
- }
- }
- }
+ public Path getWorkPath(TaskAttemptContext context, Path outputPath)
+ throws IOException {
+ return getTaskAttemptPath(context, outputPath);
+ }
+
+ @Override
+ public void setupJob(JobContext context) throws IOException {
+ getWrapped(context).setupJob(context);
+ }
+
+ @Override
+ public void commitJob(JobContext context) throws IOException {
+ getWrapped(context).commitJob(context);
}
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
- JobConf conf = context.getJobConf();
- // do the clean up of temporary directory
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- if (outputPath != null) {
- Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(conf);
- context.getProgressible().progress();
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- } else {
- LOG.warn("Output Path is Null in cleanup");
- }
- }
+ getWrapped(context).cleanupJob(context);
}
@Override
public void abortJob(JobContext context, int runState)
throws IOException {
- // simply delete the _temporary dir from the o/p folder of the job
- cleanupJob(context);
+ JobStatus.State state;
+ if(runState == JobStatus.State.RUNNING.getValue()) {
+ state = JobStatus.State.RUNNING;
+ } else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
+ state = JobStatus.State.SUCCEEDED;
+ } else if(runState == JobStatus.State.FAILED.getValue()) {
+ state = JobStatus.State.FAILED;
+ } else if(runState == JobStatus.State.PREP.getValue()) {
+ state = JobStatus.State.PREP;
+ } else if(runState == JobStatus.State.KILLED.getValue()) {
+ state = JobStatus.State.KILLED;
+ } else {
+ throw new IllegalArgumentException(runState+" is not a valid runState.");
+ }
+ getWrapped(context).abortJob(context, state);
}
public void setupTask(TaskAttemptContext context) throws IOException {
- // FileOutputCommitter's setupTask doesn't do anything. Because the
- // temporary task directory is created on demand when the
- // task is writing.
+ getWrapped(context).setupTask(context);
}
-
- public void commitTask(TaskAttemptContext context)
- throws IOException {
- Path taskOutputPath = getTempTaskOutputPath(context);
- TaskAttemptID attemptId = context.getTaskAttemptID();
- JobConf job = context.getJobConf();
- if (taskOutputPath != null) {
- FileSystem fs = taskOutputPath.getFileSystem(job);
- context.getProgressible().progress();
- if (fs.exists(taskOutputPath)) {
- // Move the task outputs to the current job attempt output dir
- JobConf conf = context.getJobConf();
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- FileSystem outputFileSystem = outputPath.getFileSystem(conf);
- Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
- moveTaskOutputs(context, outputFileSystem, jobOutputPath,
- taskOutputPath);
-
- // Delete the temporary task-specific output directory
- if (!fs.delete(taskOutputPath, true)) {
- LOG.info("Failed to delete the temporary output" +
- " directory of task: " + attemptId + " - " + taskOutputPath);
- }
- LOG.info("Saved output of task '" + attemptId + "' to " +
- jobOutputPath);
- }
- }
- }
-
- private void moveTaskOutputs(TaskAttemptContext context,
- FileSystem fs,
- Path jobOutputDir,
- Path taskOutput)
- throws IOException {
- TaskAttemptID attemptId = context.getTaskAttemptID();
- context.getProgressible().progress();
- LOG.debug("Told to move taskoutput from " + taskOutput
- + " to " + jobOutputDir);
- if (fs.isFile(taskOutput)) {
- Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
- getTempTaskOutputPath(context));
- if (!fs.rename(taskOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of task: " +
- attemptId);
- }
- if (!fs.rename(taskOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of task: " +
- attemptId);
- }
- }
- LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
- } else if(fs.getFileStatus(taskOutput).isDirectory()) {
- LOG.debug("Taskoutput " + taskOutput + " is a dir");
- FileStatus[] paths = fs.listStatus(taskOutput);
- Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
- getTempTaskOutputPath(context));
- fs.mkdirs(finalOutputPath);
- LOG.debug("Creating dirs along path " + finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
- }
- }
- }
+
+ @Override
+ public void commitTask(TaskAttemptContext context) throws IOException {
+ getWrapped(context).commitTask(context, getTaskAttemptPath(context));
}
+ @Override
public void abortTask(TaskAttemptContext context) throws IOException {
- Path taskOutputPath = getTempTaskOutputPath(context);
- if (taskOutputPath != null) {
- FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
- context.getProgressible().progress();
- fs.delete(taskOutputPath, true);
- }
- }
-
- @SuppressWarnings("deprecation")
- private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput,
- Path taskOutputPath) throws IOException {
- URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
- URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
- URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
- if (taskOutputUri == relativePath) {
- //taskOutputPath is not a parent of taskOutput
- throw new IOException("Can not get the relative path: base = " +
- taskOutputPathUri + " child = " + taskOutputUri);
- }
- if (relativePath.getPath().length() > 0) {
- return new Path(jobOutputDir, relativePath.getPath());
- } else {
- return jobOutputDir;
- }
+ getWrapped(context).abortTask(context, getTaskAttemptPath(context));
}
+ @Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
- Path taskOutputPath = getTempTaskOutputPath(context);
- if (taskOutputPath != null) {
- context.getProgressible().progress();
- // Get the file-system for the task output directory
- FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
- // since task output path is created on demand,
- // if it exists, task needs a commit
- if (fs.exists(taskOutputPath)) {
- return true;
- }
- }
- return false;
- }
-
- Path getTempTaskOutputPath(TaskAttemptContext taskContext)
- throws IOException {
- JobConf conf = taskContext.getJobConf();
- Path outputPath = FileOutputFormat.getOutputPath(conf);
- if (outputPath != null) {
- Path p = new Path(outputPath,
- (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- "_" + taskContext.getTaskAttemptID().toString()));
- FileSystem fs = p.getFileSystem(conf);
- return p.makeQualified(fs);
- }
- return null;
- }
-
- Path getWorkPath(TaskAttemptContext taskContext, Path basePath)
- throws IOException {
- // ${mapred.out.dir}/_temporary
- Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
- if (!fs.exists(jobTmpDir)) {
- throw new IOException("The temporary job-output directory " +
- jobTmpDir.toString() + " doesn't exist!");
- }
- // ${mapred.out.dir}/_temporary/_${taskid}
- String taskid = taskContext.getTaskAttemptID().toString();
- Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
- if (!fs.mkdirs(taskTmpDir)) {
- throw new IOException("Mkdirs failed to create "
- + taskTmpDir.toString());
- }
- return taskTmpDir;
+ return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
}
@Override
@@ -326,54 +184,6 @@ public class FileOutputCommitter extends
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
- Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
- context.progress();
- Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
- int previousAttempt =
- context.getConfiguration().getInt(
- MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
- if (previousAttempt < 0) {
- LOG.warn("Cannot recover task output for first attempt...");
- return;
- }
-
- FileSystem outputFileSystem =
- outputPath.getFileSystem(context.getJobConf());
- Path pathToRecover =
- new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
- if (outputFileSystem.exists(pathToRecover)) {
- // Move the task outputs to their final place
- LOG.debug("Trying to recover task from " + pathToRecover
- + " into " + jobOutputPath);
- moveJobOutputs(outputFileSystem,
- pathToRecover, jobOutputPath, pathToRecover);
- LOG.info("Saved output of job to " + jobOutputPath);
- }
- }
-
- protected static String getJobAttemptBaseDirName(JobContext context) {
- int appAttemptId =
- context.getJobConf().getInt(
- MRConstants.APPLICATION_ATTEMPT_ID, 0);
- return getJobAttemptBaseDirName(appAttemptId);
- }
-
- protected static String getJobTempDirName(TaskAttemptContext context) {
- int appAttemptId =
- context.getJobConf().getInt(
- MRConstants.APPLICATION_ATTEMPT_ID, 0);
- return getJobAttemptBaseDirName(appAttemptId);
- }
-
- protected static String getJobAttemptBaseDirName(int appAttemptId) {
- return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- + appAttemptId;
- }
-
- protected static String getTaskAttemptBaseDirName(
- TaskAttemptContext context) {
- return getJobTempDirName(context) + Path.SEPARATOR +
- FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- "_" + context.getTaskAttemptID().toString();
+ getWrapped(context).recoverTask(context);
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sat Feb 4 00:06:24 2012
@@ -525,7 +525,7 @@ abstract public class Task implements Wr
if (outputPath != null) {
if ((committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(conf,
- ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext));
+ ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
} else {
FileOutputFormat.setWorkOutputPath(conf, outputPath);
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Sat Feb 4 00:06:24 2012
@@ -51,17 +51,21 @@ import org.apache.hadoop.classification.
* Discard the task commit.
* </li>
* </ol>
+ * The methods in this class can be called from several different processes and
+ * from several different contexts. It is important to know which process and
+ * which context each is called from. Each method should be marked accordingly
+ * in its documentation.
*
* @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
* @see JobContext
* @see TaskAttemptContext
- *
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputCommitter {
/**
- * For the framework to setup the job output during initialization
+ * For the framework to setup the job output during initialization. This is
+ * called from the application master process for the entire job.
*
* @param jobContext Context of the job whose output is being written.
* @throws IOException if temporary output could not be created
@@ -69,11 +73,12 @@ public abstract class OutputCommitter {
public abstract void setupJob(JobContext jobContext) throws IOException;
/**
- * For cleaning up the job's output after job completion
+ * For cleaning up the job's output after job completion. This is called
+ * from the application master process for the entire job.
*
* @param jobContext Context of the job whose output is being written.
* @throws IOException
- * @deprecated Use {@link #commitJob(JobContext)} or
+ * @deprecated Use {@link #commitJob(JobContext)} and
* {@link #abortJob(JobContext, JobStatus.State)} instead.
*/
@Deprecated
@@ -81,7 +86,8 @@ public abstract class OutputCommitter {
/**
* For committing job's output after successful job completion. Note that this
- * is invoked for jobs with final runstate as SUCCESSFUL.
+ * is invoked for jobs with final runstate as SUCCESSFUL. This is called
+ * from the application master process for the entire job.
*
* @param jobContext Context of the job whose output is being written.
* @throws IOException
@@ -94,7 +100,8 @@ public abstract class OutputCommitter {
/**
* For aborting an unsuccessful job's output. Note that this is invoked for
* jobs with final runstate as {@link JobStatus.State#FAILED} or
- * {@link JobStatus.State#KILLED}.
+ * {@link JobStatus.State#KILLED}. This is called from the application
+ * master process for the entire job.
*
* @param jobContext Context of the job whose output is being written.
* @param state final runstate of the job
@@ -106,7 +113,8 @@ public abstract class OutputCommitter {
}
/**
- * Sets up output for the task.
+ * Sets up output for the task. This is called from each individual task's
+ * process that will output to HDFS, and it is called just for that task.
*
* @param taskContext Context of the task whose output is being written.
* @throws IOException
@@ -115,7 +123,9 @@ public abstract class OutputCommitter {
throws IOException;
/**
- * Check whether task needs a commit
+ * Check whether task needs a commit. This is called from each individual
+ * task's process that will output to HDFS, and it is called just for that
+ * task.
*
* @param taskContext
* @return true/false
@@ -125,18 +135,23 @@ public abstract class OutputCommitter {
throws IOException;
/**
- * To promote the task's temporary output to final output location
- *
- * The task's output is moved to the job's output directory.
+ * To promote the task's temporary output to final output location.
+ * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this
+ * task is the task that the AM determines finished first, this method
+ * is called to commit an individual task's output. This is to mark
+ * that tasks output as complete, as {@link #commitJob(JobContext)} will
+ * also be called later on if the entire job finished successfully. This
+ * is called from a task's process.
*
* @param taskContext Context of the task whose output is being written.
- * @throws IOException if commit is not
+ * @throws IOException if commit is not successful.
*/
public abstract void commitTask(TaskAttemptContext taskContext)
throws IOException;
/**
- * Discard the task output
+ * Discard the task output. This is called from a task's process to clean
+ * up a single task's output that can not yet been committed.
*
* @param taskContext
* @throws IOException
@@ -164,7 +179,8 @@ public abstract class OutputCommitter {
* The retry-count for the job will be passed via the
* {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in
* {@link TaskAttemptContext#getConfiguration()} for the
- * <code>OutputCommitter</code>.
+ * <code>OutputCommitter</code>. This is called from the application master
+ * process, but it is called individually for each task.
*
* If an exception is thrown the task will be attempted again.
*
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Sat Feb 4 00:06:24 2012
@@ -19,16 +19,16 @@
package org.apache.hadoop.mapreduce.lib.output;
import java.io.IOException;
-import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.TaskAttemptID;
/** An {@link OutputCommitter} that commits files specified
- * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
+ * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}.
**/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileOutputCommitter extends OutputCommitter {
-
private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
- /**
- * Temporary directory name
+ /**
+ * Name of directory where pending data is placed. Data that has not been
+ * committed yet.
*/
- protected static final String TEMP_DIR_NAME = "_temporary";
+ public static final String PENDING_DIR_NAME = "_temporary";
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
- static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+ public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
- private FileSystem outputFileSystem = null;
private Path outputPath = null;
private Path workPath = null;
/**
* Create a file output committer
- * @param outputPath the job's output path
+ * @param outputPath the job's output path, or null if you want the output
+ * committer to act as a noop.
* @param context the task's context
* @throws IOException
*/
public FileOutputCommitter(Path outputPath,
TaskAttemptContext context) throws IOException {
+ this(outputPath, (JobContext)context);
+ if (outputPath != null) {
+ workPath = getTaskAttemptPath(context, outputPath);
+ }
+ }
+
+ /**
+ * Create a file output committer
+ * @param outputPath the job's output path, or null if you want the output
+ * committer to act as a noop.
+ * @param context the task's context
+ * @throws IOException
+ */
+ @Private
+ public FileOutputCommitter(Path outputPath,
+ JobContext context) throws IOException {
if (outputPath != null) {
- this.outputPath = outputPath;
- outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
- workPath = new Path(outputPath,
- getTaskAttemptBaseDirName(context))
- .makeQualified(outputFileSystem);
+ FileSystem fs = outputPath.getFileSystem(context.getConfiguration());
+ this.outputPath = fs.makeQualified(outputPath);
}
}
+
+ /**
+ * @return the path where final output of the job should be placed. This
+ * could also be considered the committed application attempt path.
+ */
+ private Path getOutputPath() {
+ return this.outputPath;
+ }
+
+ /**
+ * @return true if we have an output path set, else false.
+ */
+ private boolean hasOutputPath() {
+ return this.outputPath != null;
+ }
+
+ /**
+ * @return the path where the output of pending job attempts are
+ * stored.
+ */
+ private Path getPendingJobAttemptsPath() {
+ return getPendingJobAttemptsPath(getOutputPath());
+ }
+
+ /**
+ * Get the location of pending job attempts.
+ * @param out the base output directory.
+ * @return the location of pending job attempts.
+ */
+ private static Path getPendingJobAttemptsPath(Path out) {
+ return new Path(out, PENDING_DIR_NAME);
+ }
+
+ /**
+ * Get the Application Attempt Id for this job
+ * @param context the context to look in
+ * @return the Application Attempt Id for a given job.
+ */
+ private static int getAppAttemptId(JobContext context) {
+ return context.getConfiguration().getInt(
+ MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param context the context of the job. This is used to get the
+ * application attempt id.
+ * @return the path to store job attempt data.
+ */
+ public Path getJobAttemptPath(JobContext context) {
+ return getJobAttemptPath(context, getOutputPath());
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param context the context of the job. This is used to get the
+ * application attempt id.
+ * @param out the output path to place these in.
+ * @return the path to store job attempt data.
+ */
+ public static Path getJobAttemptPath(JobContext context, Path out) {
+ return getJobAttemptPath(getAppAttemptId(context), out);
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param appAttemptId the ID of the application attempt for this job.
+ * @return the path to store job attempt data.
+ */
+ private Path getJobAttemptPath(int appAttemptId) {
+ return getJobAttemptPath(appAttemptId, getOutputPath());
+ }
+
+ /**
+ * Compute the path where the output of a given job attempt will be placed.
+ * @param appAttemptId the ID of the application attempt for this job.
+ * @return the path to store job attempt data.
+ */
+ private static Path getJobAttemptPath(int appAttemptId, Path out) {
+ return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
+ }
+
+ /**
+ * Compute the path where the output of pending task attempts are stored.
+ * @param context the context of the job with pending tasks.
+ * @return the path where the output of pending task attempts are stored.
+ */
+ private Path getPendingTaskAttemptsPath(JobContext context) {
+ return getPendingTaskAttemptsPath(context, getOutputPath());
+ }
+
+ /**
+ * Compute the path where the output of pending task attempts are stored.
+ * @param context the context of the job with pending tasks.
+ * @return the path where the output of pending task attempts are stored.
+ */
+ private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
+ return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME);
+ }
+
+ /**
+ * Compute the path where the output of a task attempt is stored until
+ * that task is committed.
+ *
+ * @param context the context of the task attempt.
+ * @return the path where a task attempt should be stored.
+ */
+ public Path getTaskAttemptPath(TaskAttemptContext context) {
+ return new Path(getPendingTaskAttemptsPath(context),
+ String.valueOf(context.getTaskAttemptID()));
+ }
+
+ /**
+ * Compute the path where the output of a task attempt is stored until
+ * that task is committed.
+ *
+ * @param context the context of the task attempt.
+ * @param out The output path to put things in.
+ * @return the path where a task attempt should be stored.
+ */
+ public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
+ return new Path(getPendingTaskAttemptsPath(context, out),
+ String.valueOf(context.getTaskAttemptID()));
+ }
+
+ /**
+ * Compute the path where the output of a committed task is stored until
+ * the entire job is committed.
+ * @param context the context of the task attempt
+ * @return the path where the output of a committed task is stored until
+ * the entire job is committed.
+ */
+ public Path getCommittedTaskPath(TaskAttemptContext context) {
+ return getCommittedTaskPath(getAppAttemptId(context), context);
+ }
+
+ public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) {
+ return getCommittedTaskPath(getAppAttemptId(context), context, out);
+ }
+
+ /**
+ * Compute the path where the output of a committed task is stored until the
+ * entire job is committed for a specific application attempt.
+ * @param appAttemptId the id of the application attempt to use
+ * @param context the context of any task.
+ * @return the path where the output of a committed task is stored.
+ */
+ private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
+ return new Path(getJobAttemptPath(appAttemptId),
+ String.valueOf(context.getTaskAttemptID().getTaskID()));
+ }
+
+ private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) {
+ return new Path(getJobAttemptPath(appAttemptId, out),
+ String.valueOf(context.getTaskAttemptID().getTaskID()));
+ }
+
+ private static class CommittedTaskFilter implements PathFilter {
+ @Override
+ public boolean accept(Path path) {
+ return !PENDING_DIR_NAME.equals(path.getName());
+ }
+ }
+
+ /**
+ * Get a list of all paths where output from committed tasks are stored.
+ * @param context the context of the current job
+ * @return the list of these Paths/FileStatuses.
+ * @throws IOException
+ */
+ private FileStatus[] getAllCommittedTaskPaths(JobContext context)
+ throws IOException {
+ Path jobAttemptPath = getJobAttemptPath(context);
+ FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration());
+ return fs.listStatus(jobAttemptPath, new CommittedTaskFilter());
+ }
+
+ /**
+ * Get the directory that the task should write results into.
+ * @return the work directory
+ * @throws IOException
+ */
+ public Path getWorkPath() throws IOException {
+ return workPath;
+ }
/**
* Create the temporary directory that is the root of all of the task
@@ -79,116 +277,103 @@ public class FileOutputCommitter extends
* @param context the job's context
*/
public void setupJob(JobContext context) throws IOException {
- if (outputPath != null) {
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (!fileSys.mkdirs(tmpDir)) {
- LOG.error("Mkdirs failed to create " + tmpDir.toString());
+ if (hasOutputPath()) {
+ Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+ FileSystem fs = pendingJobAttemptsPath.getFileSystem(
+ context.getConfiguration());
+ if (!fs.mkdirs(pendingJobAttemptsPath)) {
+ LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath);
}
- }
- }
-
- // True if the job requires output.dir marked on successful job.
- // Note that by default it is set to true.
- private boolean shouldMarkOutputDir(Configuration conf) {
- return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
- }
-
- // Create a _success file in the job's output dir
- private void markOutputDirSuccessful(MRJobConfig context) throws IOException {
- if (outputPath != null) {
- // create a file in the output folder to mark the job completion
- Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
- outputFileSystem.create(filePath).close();
+ } else {
+ LOG.warn("Output Path is null in setupJob()");
}
}
/**
- * Move all job output to the final place.
+ * The job has completed so move all committed tasks to the final output dir.
* Delete the temporary directory, including all of the work directories.
* Create a _SUCCESS file to make it as successful.
* @param context the job's context
*/
public void commitJob(JobContext context) throws IOException {
- if (outputPath != null) {
- //delete the task temp directory from the current jobtempdir
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- } else {
- LOG.warn("Task temp dir could not be deleted " + tmpDir);
+ if (hasOutputPath()) {
+ Path finalOutput = getOutputPath();
+ FileSystem fs = finalOutput.getFileSystem(context.getConfiguration());
+ for(FileStatus stat: getAllCommittedTaskPaths(context)) {
+ mergePaths(fs, stat, finalOutput);
}
- //move the job output to final place
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
-
// delete the _temporary folder and create a _done file in the o/p folder
cleanupJob(context);
- if (shouldMarkOutputDir(context.getConfiguration())) {
- markOutputDirSuccessful(context);
+ // True if the job requires output.dir marked on successful job.
+ // Note that by default it is set to true.
+ if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
+ Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+ fs.create(markerPath).close();
}
+ } else {
+ LOG.warn("Output Path is null in commitJob()");
}
}
/**
- * Move job output to final location
- * @param fs Filesystem handle
- * @param origJobOutputPath The original location of the job output
- * Required to generate the relative path for correct moving of data.
- * @param finalOutputDir The final output directory to which the job output
- * needs to be moved
- * @param jobOutput The current job output directory being moved
- * @throws IOException
- */
- private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
- Path finalOutputDir, Path jobOutput) throws IOException {
- LOG.debug("Told to move job output from " + jobOutput
- + " to " + finalOutputDir +
- " and orig job output path is " + origJobOutputPath);
- if (fs.isFile(jobOutput)) {
- Path finalOutputPath =
- getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
- if (!fs.rename(jobOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of job");
- }
- if (!fs.rename(jobOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of job");
- }
- }
- LOG.debug("Moved job output file from " + jobOutput + " to " +
- finalOutputPath);
- } else if (fs.getFileStatus(jobOutput).isDirectory()) {
- LOG.debug("Job output file " + jobOutput + " is a dir");
- FileStatus[] paths = fs.listStatus(jobOutput);
- Path finalOutputPath =
- getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
- fs.mkdirs(finalOutputPath);
- LOG.debug("Creating dirs along job output path " + finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
- }
- }
- }
+ * Merge two paths together. Anything in from will be moved into to, if there
+ * are any name conflicts while merging the files or directories in from win.
+ * @param fs the File System to use
+ * @param from the path data is coming from.
+ * @param to the path data is going to.
+ * @throws IOException on any error
+ */
+ private static void mergePaths(FileSystem fs, final FileStatus from,
+ final Path to)
+ throws IOException {
+ LOG.debug("Merging data from "+from+" to "+to);
+ if(from.isFile()) {
+ if(fs.exists(to)) {
+ if(!fs.delete(to, true)) {
+ throw new IOException("Failed to delete "+to);
+ }
+ }
+
+ if(!fs.rename(from.getPath(), to)) {
+ throw new IOException("Failed to rename "+from+" to "+to);
+ }
+ } else if(from.isDirectory()) {
+ if(fs.exists(to)) {
+ FileStatus toStat = fs.getFileStatus(to);
+ if(!toStat.isDirectory()) {
+ if(!fs.delete(to, true)) {
+ throw new IOException("Failed to delete "+to);
+ }
+ if(!fs.rename(from.getPath(), to)) {
+ throw new IOException("Failed to rename "+from+" to "+to);
+ }
+ } else {
+ //It is a directory so merge everything in the directories
+ for(FileStatus subFrom: fs.listStatus(from.getPath())) {
+ Path subTo = new Path(to, subFrom.getPath().getName());
+ mergePaths(fs, subFrom, subTo);
+ }
+ }
+ } else {
+ //it does not exist just rename
+ if(!fs.rename(from.getPath(), to)) {
+ throw new IOException("Failed to rename "+from+" to "+to);
+ }
+ }
+ }
}
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
- if (outputPath != null) {
- Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- }
+ if (hasOutputPath()) {
+ Path pendingJobAttemptsPath = getPendingJobAttemptsPath();
+ FileSystem fs = pendingJobAttemptsPath
+ .getFileSystem(context.getConfiguration());
+ fs.delete(pendingJobAttemptsPath, true);
} else {
- LOG.warn("Output Path is null in cleanup");
+ LOG.warn("Output Path is null in cleanupJob()");
}
}
@@ -217,69 +402,40 @@ public class FileOutputCommitter extends
* Move the files from the work directory to the job output directory
* @param context the task context
*/
+ @Override
public void commitTask(TaskAttemptContext context)
throws IOException {
- TaskAttemptID attemptId = context.getTaskAttemptID();
- if (workPath != null) {
- context.progress();
- if (outputFileSystem.exists(workPath)) {
- // Move the task outputs to the current job attempt output dir
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath);
- // Delete the temporary task-specific output directory
- if (!outputFileSystem.delete(workPath, true)) {
- LOG.warn("Failed to delete the temporary output" +
- " directory of task: " + attemptId + " - " + workPath);
- }
- LOG.info("Saved output of task '" + attemptId + "' to " +
- jobOutputPath);
- }
- }
+ commitTask(context, null);
}
- /**
- * Move all of the files from the work directory to the final output
- * @param context the task context
- * @param fs the output file system
- * @param jobOutputDir the final output direcotry
- * @param taskOutput the work path
- * @throws IOException
- */
- private void moveTaskOutputs(TaskAttemptContext context,
- FileSystem fs,
- Path jobOutputDir,
- Path taskOutput)
+ @Private
+ public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
- context.progress();
- LOG.debug("Told to move taskoutput from " + taskOutput
- + " to " + jobOutputDir);
- if (fs.isFile(taskOutput)) {
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
- workPath);
- if (!fs.rename(taskOutput, finalOutputPath)) {
- if (!fs.delete(finalOutputPath, true)) {
- throw new IOException("Failed to delete earlier output of task: " +
- attemptId);
- }
- if (!fs.rename(taskOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of task: " +
- attemptId);
- }
+ if (hasOutputPath()) {
+ context.progress();
+ if(taskAttemptPath == null) {
+ taskAttemptPath = getTaskAttemptPath(context);
}
- LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
- } else if(fs.getFileStatus(taskOutput).isDirectory()) {
- LOG.debug("Taskoutput " + taskOutput + " is a dir");
- FileStatus[] paths = fs.listStatus(taskOutput);
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
- fs.mkdirs(finalOutputPath);
- LOG.debug("Creating dirs along path " + finalOutputPath);
- if (paths != null) {
- for (FileStatus path : paths) {
- moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
+ Path committedTaskPath = getCommittedTaskPath(context);
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+ if (fs.exists(taskAttemptPath)) {
+ if(fs.exists(committedTaskPath)) {
+ if(!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete " + committedTaskPath);
+ }
+ }
+ if(!fs.rename(taskAttemptPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + taskAttemptPath + " to "
+ + committedTaskPath);
}
+ LOG.info("Saved output of task '" + attemptId + "' to " +
+ committedTaskPath);
+ } else {
+ LOG.warn("No Output found for " + attemptId);
}
+ } else {
+ LOG.warn("Output Path is null in commitTask()");
}
}
@@ -289,38 +445,22 @@ public class FileOutputCommitter extends
*/
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
- if (workPath != null) {
- context.progress();
- outputFileSystem.delete(workPath, true);
- }
+ abortTask(context, null);
}
- /**
- * Find the final name of a given output file, given the job output directory
- * and the work directory.
- * @param jobOutputDir the job's output directory
- * @param taskOutput the specific task output file
- * @param taskOutputPath the job's work directory
- * @return the final path for the specific output file
- * @throws IOException
- */
- private Path getFinalPath(Path jobOutputDir, Path taskOutput,
- Path taskOutputPath) throws IOException {
- URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(),
- outputFileSystem.getWorkingDirectory()).toUri();
- URI taskOutputPathUri =
- taskOutputPath.makeQualified(
- outputFileSystem.getUri(),
- outputFileSystem.getWorkingDirectory()).toUri();
- URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
- if (taskOutputUri == relativePath) {
- throw new IOException("Can not get the relative path: base = " +
- taskOutputPathUri + " child = " + taskOutputUri);
- }
- if (relativePath.getPath().length() > 0) {
- return new Path(jobOutputDir, relativePath.getPath());
+ @Private
+ public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
+ if (hasOutputPath()) {
+ context.progress();
+ if(taskAttemptPath == null) {
+ taskAttemptPath = getTaskAttemptPath(context);
+ }
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+ if(!fs.delete(taskAttemptPath, true)) {
+ LOG.warn("Could not delete "+taskAttemptPath);
+ }
} else {
- return jobOutputDir;
+ LOG.warn("Output Path is null in abortTask()");
}
}
@@ -331,16 +471,20 @@ public class FileOutputCommitter extends
@Override
public boolean needsTaskCommit(TaskAttemptContext context
) throws IOException {
- return workPath != null && outputFileSystem.exists(workPath);
+ return needsTaskCommit(context, null);
}
- /**
- * Get the directory that the task should write results into
- * @return the work directory
- * @throws IOException
- */
- public Path getWorkPath() throws IOException {
- return workPath;
+ @Private
+ public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath
+ ) throws IOException {
+ if(hasOutputPath()) {
+ if(taskAttemptPath == null) {
+ taskAttemptPath = getTaskAttemptPath(context);
+ }
+ FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
+ return fs.exists(taskAttemptPath);
+ }
+ return false;
}
@Override
@@ -352,43 +496,35 @@ public class FileOutputCommitter extends
public void recoverTask(TaskAttemptContext context)
throws IOException {
context.progress();
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- int previousAttempt =
- context.getConfiguration().getInt(
- MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1;
+ TaskAttemptID attemptId = context.getTaskAttemptID();
+ int previousAttempt = getAppAttemptId(context) - 1;
if (previousAttempt < 0) {
throw new IOException ("Cannot recover task output for first attempt...");
}
-
- Path pathToRecover =
- new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
- LOG.debug("Trying to recover task from " + pathToRecover
- + " into " + jobOutputPath);
- if (outputFileSystem.exists(pathToRecover)) {
- // Move the task outputs to their final place
- moveJobOutputs(outputFileSystem,
- pathToRecover, jobOutputPath, pathToRecover);
- LOG.info("Saved output of job to " + jobOutputPath);
+
+ Path committedTaskPath = getCommittedTaskPath(context);
+ Path previousCommittedTaskPath = getCommittedTaskPath(
+ previousAttempt, context);
+ FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration());
+
+ LOG.debug("Trying to recover task from " + previousCommittedTaskPath
+ + " into " + committedTaskPath);
+ if (fs.exists(previousCommittedTaskPath)) {
+ if(fs.exists(committedTaskPath)) {
+ if(!fs.delete(committedTaskPath, true)) {
+ throw new IOException("Could not delete "+committedTaskPath);
+ }
+ }
+ //Rename can fail if the parent directory does not yet exist.
+ Path committedParent = committedTaskPath.getParent();
+ fs.mkdirs(committedParent);
+ if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) {
+ throw new IOException("Could not rename " + previousCommittedTaskPath +
+ " to " + committedTaskPath);
+ }
+ LOG.info("Saved output of " + attemptId + " to " + committedTaskPath);
+ } else {
+ LOG.warn(attemptId+" had no output to recover.");
}
}
-
- protected static String getJobAttemptBaseDirName(JobContext context) {
- int appAttemptId =
- context.getConfiguration().getInt(
- MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
- return getJobAttemptBaseDirName(appAttemptId);
- }
-
- protected static String getJobAttemptBaseDirName(int appAttemptId) {
- return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- + appAttemptId;
- }
-
- protected static String getTaskAttemptBaseDirName(
- TaskAttemptContext context) {
- return getJobAttemptBaseDirName(context) + Path.SEPARATOR +
- FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- "_" + context.getTaskAttemptID().toString();
- }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb 4 00:06:24 2012
@@ -105,10 +105,9 @@ public class TestFileOutputCommitter ext
// do commit
committer.commitTask(tContext);
- Path jobTempDir1 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir1.toString()).exists()));
+ Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+ File jtd1 = new File(jobTempDir1.toUri().getPath());
+ assertTrue(jtd1.exists());
validateContent(jobTempDir1);
//now while running the second app attempt,
@@ -119,14 +118,12 @@ public class TestFileOutputCommitter ext
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter();
- committer.setupJob(jContext2);
- Path jobTempDir2 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir2.toString()).exists()));
+ committer2.setupJob(jContext2);
+ Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
- tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
committer2.recoverTask(tContext2);
+ File jtd2 = new File(jobTempDir2.toUri().getPath());
+ assertTrue(jtd2.exists());
validateContent(jobTempDir2);
committer2.commitJob(jContext2);
@@ -135,7 +132,8 @@ public class TestFileOutputCommitter ext
}
private void validateContent(Path dir) throws IOException {
- File expectedFile = new File(new Path(dir, partFile).toString());
+ File fdir = new File(dir.toUri().getPath());
+ File expectedFile = new File(fdir, partFile);
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
@@ -244,21 +242,17 @@ public class TestFileOutputCommitter ext
// do abort
committer.abortTask(tContext);
- FileSystem outputFileSystem = outDir.getFileSystem(conf);
- Path workPath = new Path(outDir,
- committer.getTaskAttemptBaseDirName(tContext))
- .makeQualified(outputFileSystem);
- File expectedFile = new File(new Path(workPath, partFile)
- .toString());
+ File out = new File(outDir.toUri().getPath());
+ Path workPath = committer.getWorkPath(tContext, outDir);
+ File wp = new File(workPath.toUri().getPath());
+ File expectedFile = new File(wp, partFile);
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
- expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
- .toString());
+ expectedFile = new File(out, FileOutputCommitter.TEMP_DIR_NAME);
assertFalse("job temp dir still exists", expectedFile.exists());
- assertEquals("Output directory not empty", 0, new File(outDir.toString())
- .listFiles().length);
- FileUtil.fullyDelete(new File(outDir.toString()));
+ assertEquals("Output directory not empty", 0, out.listFiles().length);
+ FileUtil.fullyDelete(out);
}
public static class FakeFileSystem extends RawLocalFileSystem {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Sat Feb 4 00:06:24 2012
@@ -60,6 +60,22 @@ public class TestFileOutputCommitter ext
private Text val2 = new Text("val2");
+ private static void cleanup() throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fs = outDir.getFileSystem(conf);
+ fs.delete(outDir, true);
+ }
+
+ @Override
+ public void setUp() throws IOException {
+ cleanup();
+ }
+
+ @Override
+ public void tearDown() throws IOException {
+ cleanup();
+ }
+
private void writeOutput(RecordWriter theRecordWriter,
TaskAttemptContext context) throws IOException, InterruptedException {
NullWritable nullWritable = NullWritable.get();
@@ -114,11 +130,10 @@ public class TestFileOutputCommitter ext
// do commit
committer.commitTask(tContext);
- Path jobTempDir1 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir1.toString()).exists()));
- validateContent(jobTempDir1);
+ Path jobTempDir1 = committer.getCommittedTaskPath(tContext);
+ File jtd = new File(jobTempDir1.toUri().getPath());
+ assertTrue(jtd.exists());
+ validateContent(jtd);
//now while running the second app attempt,
//recover the task output from first attempt
@@ -128,15 +143,13 @@ public class TestFileOutputCommitter ext
JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2);
- committer.setupJob(tContext2);
- Path jobTempDir2 = new Path(outDir,
- FileOutputCommitter.getJobAttemptBaseDirName(
- conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
- assertTrue((new File(jobTempDir2.toString()).exists()));
+ committer2.setupJob(tContext2);
+ Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2);
+ File jtd2 = new File(jobTempDir2.toUri().getPath());
- tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
committer2.recoverTask(tContext2);
- validateContent(jobTempDir2);
+ assertTrue(jtd2.exists());
+ validateContent(jtd2);
committer2.commitJob(jContext2);
validateContent(outDir);
@@ -144,7 +157,12 @@ public class TestFileOutputCommitter ext
}
private void validateContent(Path dir) throws IOException {
- File expectedFile = new File(new Path(dir, partFile).toString());
+ validateContent(new File(dir.toUri().getPath()));
+ }
+
+ private void validateContent(File dir) throws IOException {
+ File expectedFile = new File(dir, partFile);
+ assertTrue("Could not find "+expectedFile, expectedFile.exists());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
@@ -259,7 +277,7 @@ public class TestFileOutputCommitter ext
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
- expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+ expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME)
.toString());
assertFalse("job temp dir still exists", expectedFile.exists());
assertEquals("Output directory not empty", 0, new File(outDir.toString())
@@ -315,12 +333,10 @@ public class TestFileOutputCommitter ext
assertNotNull(th);
assertTrue(th instanceof IOException);
assertTrue(th.getMessage().contains("fake delete failed"));
- File jobTmpDir = new File(new Path(outDir,
- FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
- conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) +
- Path.SEPARATOR +
- FileOutputCommitter.TEMP_DIR_NAME).toString());
- File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+ Path jtd = committer.getJobAttemptPath(jContext);
+ File jobTmpDir = new File(jtd.toUri().getPath());
+ Path ttd = committer.getTaskAttemptPath(tContext);
+ File taskTmpDir = new File(ttd.toUri().getPath());
File expectedFile = new File(taskTmpDir, partFile);
assertTrue(expectedFile + " does not exists", expectedFile.exists());
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb 4 00:06:24 2012
@@ -74,7 +74,7 @@ public class TestFileOutputCommitter ext
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job,
- committer.getTempTaskOutputPath(tContext));
+ committer.getTaskAttemptPath(tContext));
committer.setupJob(jContext);
committer.setupTask(tContext);
@@ -115,7 +115,7 @@ public class TestFileOutputCommitter ext
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job, committer
- .getTempTaskOutputPath(tContext));
+ .getTaskAttemptPath(tContext));
// do setup
committer.setupJob(jContext);
@@ -134,13 +134,13 @@ public class TestFileOutputCommitter ext
// do abort
committer.abortTask(tContext);
File expectedFile = new File(new Path(committer
- .getTempTaskOutputPath(tContext), file).toString());
+ .getTaskAttemptPath(tContext), file).toString());
assertFalse("task temp dir still exists", expectedFile.exists());
committer.abortJob(jContext, JobStatus.State.FAILED);
expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
.toString());
- assertFalse("job temp dir still exists", expectedFile.exists());
+ assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists());
assertEquals("Output directory not empty", 0, new File(outDir.toString())
.listFiles().length);
FileUtil.fullyDelete(new File(outDir.toString()));
@@ -170,16 +170,15 @@ public class TestFileOutputCommitter ext
TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID);
FileOutputCommitter committer = new FileOutputCommitter();
FileOutputFormat.setWorkOutputPath(job, committer
- .getTempTaskOutputPath(tContext));
+ .getTaskAttemptPath(tContext));
// do setup
committer.setupJob(jContext);
committer.setupTask(tContext);
String file = "test.txt";
- String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
- File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
- File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
+ File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath());
+ File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath());
File expectedFile = new File(taskTmpDir, file);
// A reporter that does nothing
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1240414&r1=1240413&r2=1240414&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java Sat Feb 4 00:06:24 2012
@@ -34,7 +34,7 @@ public class TestTaskCommit extends Hado
static class CommitterWithCommitFail extends FileOutputCommitter {
public void commitTask(TaskAttemptContext context) throws IOException {
- Path taskOutputPath = getTempTaskOutputPath(context);
+ Path taskOutputPath = getTaskAttemptPath(context);
TaskAttemptID attemptId = context.getTaskAttemptID();
JobConf job = context.getJobConf();
if (taskOutputPath != null) {