You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/11 20:27:51 UTC
svn commit: r1182008 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org...
Author: acmurthy
Date: Tue Oct 11 18:27:51 2011
New Revision: 1182008
URL: http://svn.apache.org/viewvc?rev=1182008&view=rev
Log:
MAPREDUCE-3148. Ported MAPREDUCE-2702 to old mapred api for aiding task recovery.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Tue Oct 11 18:27:51 2011
@@ -362,6 +362,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3161. Improved some javadocs and fixed some typos in
YARN. (Todd Lipcon via vinodkv)
+ MAPREDUCE-3148. Ported MAPREDUCE-2702 to old mapred api for aiding task
+ recovery. (acmurthy)
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Tue Oct 11 18:27:51 2011
@@ -160,7 +160,10 @@
</Match>
<Match>
<Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
+ <Or>
<Method name="commitJob" />
+ <Method name="recoverTask" />
+ </Or>
<Bug pattern="NM_WRONG_PACKAGE" />
</Match>
<Match>
@@ -169,6 +172,7 @@
<Method name="abortJob" />
<Method name="commitJob" />
<Method name="cleanupJob" />
+ <Method name="recoverTask" />
</Or>
<Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
</Match>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -38,7 +38,8 @@ public class FileOutputCommitter extends
public static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.mapred.FileOutputCommitter");
-/**
+
+ /**
* Temporary directory name
*/
public static final String TEMP_DIR_NAME = "_temporary";
@@ -50,7 +51,9 @@ public class FileOutputCommitter extends
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
- Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+ Path tmpDir =
+ new Path(outputPath, getJobAttemptBaseDirName(context) +
+ Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (!fileSys.mkdirs(tmpDir)) {
LOG.error("Mkdirs failed to create " + tmpDir.toString());
@@ -65,6 +68,24 @@ public class FileOutputCommitter extends
}
public void commitJob(JobContext context) throws IOException {
+ //delete the task temp directory from the current jobtempdir
+ JobConf conf = context.getJobConf();
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+ Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+ Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+ if (fileSys.exists(tmpDir)) {
+ fileSys.delete(tmpDir, true);
+ } else {
+ LOG.warn("Task temp dir could not be deleted " + tmpDir);
+ }
+
+ //move the job output to final place
+ Path jobOutputPath =
+ new Path(outputPath, getJobAttemptBaseDirName(context));
+ moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
+
// delete the _temporary folder in the output folder
cleanupJob(context);
// check if the output-dir marking is required
@@ -88,6 +109,30 @@ public class FileOutputCommitter extends
}
}
+ private void moveJobOutputs(FileSystem fs,
+ Path finalOutputDir, Path jobOutput) throws IOException {
+ if (fs.isFile(jobOutput)) {
+ Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+ if (!fs.rename(jobOutput, finalOutputPath)) {
+ if (!fs.delete(finalOutputPath, true)) {
+ throw new IOException("Failed to delete earlier output of job");
+ }
+ if (!fs.rename(jobOutput, finalOutputPath)) {
+ throw new IOException("Failed to save output of job");
+ }
+ }
+ LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+ } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+ FileStatus[] paths = fs.listStatus(jobOutput);
+ Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+ fs.mkdirs(finalOutputPath);
+ if (paths != null) {
+ for (FileStatus path : paths) {
+ moveJobOutputs(fs, finalOutputDir, path.getPath());
+ }
+ }
+ }
+ }
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
@@ -128,9 +173,14 @@ public class FileOutputCommitter extends
FileSystem fs = taskOutputPath.getFileSystem(job);
context.getProgressible().progress();
if (fs.exists(taskOutputPath)) {
- Path jobOutputPath = taskOutputPath.getParent().getParent();
- // Move the task outputs to their final place
- moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
+ // Move the task outputs to the current job attempt output dir
+ JobConf conf = context.getJobConf();
+ Path outputPath = FileOutputFormat.getOutputPath(conf);
+ FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+ Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
+ moveTaskOutputs(context, outputFileSystem, jobOutputPath,
+ taskOutputPath);
+
// Delete the temporary task-specific output directory
if (!fs.delete(taskOutputPath, true)) {
LOG.info("Failed to delete the temporary output" +
@@ -189,7 +239,8 @@ public class FileOutputCommitter extends
Path taskOutputPath) throws IOException {
URI taskOutputUri = taskOutput.toUri();
URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
- if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+ if (taskOutputUri == relativePath) {
+ //taskOutputPath is not a parent of taskOutput
throw new IOException("Can not get the relative path: base = " +
taskOutputPath + " child = " + taskOutput);
}
@@ -216,7 +267,8 @@ public class FileOutputCommitter extends
return false;
}
- Path getTempTaskOutputPath(TaskAttemptContext taskContext) throws IOException {
+ Path getTempTaskOutputPath(TaskAttemptContext taskContext)
+ throws IOException {
JobConf conf = taskContext.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
@@ -247,4 +299,60 @@ public class FileOutputCommitter extends
}
return taskTmpDir;
}
+
+ @Override
+ public boolean isRecoverySupported() {
+ return true;
+ }
+
+ @Override
+ public void recoverTask(TaskAttemptContext context)
+ throws IOException {
+ Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
+ context.progress();
+ Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
+ int previousAttempt =
+ context.getConfiguration().getInt(
+ MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
+ if (previousAttempt < 0) {
+ LOG.warn("Cannot recover task output for first attempt...");
+ return;
+ }
+
+ FileSystem outputFileSystem =
+ outputPath.getFileSystem(context.getJobConf());
+ Path pathToRecover =
+ new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+ if (outputFileSystem.exists(pathToRecover)) {
+ // Move the task outputs to their final place
+ moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+ LOG.info("Saved output of job to " + jobOutputPath);
+ }
+ }
+
+ protected static String getJobAttemptBaseDirName(JobContext context) {
+ int appAttemptId =
+ context.getJobConf().getInt(
+ MRConstants.APPLICATION_ATTEMPT_ID, 0);
+ return getJobAttemptBaseDirName(appAttemptId);
+ }
+
+ protected static String getJobTempDirName(TaskAttemptContext context) {
+ int appAttemptId =
+ context.getJobConf().getInt(
+ MRConstants.APPLICATION_ATTEMPT_ID, 0);
+ return getJobAttemptBaseDirName(appAttemptId);
+ }
+
+ protected static String getJobAttemptBaseDirName(int appAttemptId) {
+ return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+ + appAttemptId;
+ }
+
+ protected static String getTaskAttemptBaseDirName(
+ TaskAttemptContext context) {
+ return getJobTempDirName(context) + Path.SEPARATOR +
+ FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+ "_" + context.getTaskAttemptID().toString();
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java Tue Oct 11 18:27:51 2011
@@ -60,4 +60,9 @@ public interface MRConstants {
/** Used in MRv1, mostly in TaskTracker code **/
public static final String WORKDIR = "work";
+
+ /** Used on by MRv2 */
+ public static final String APPLICATION_ATTEMPT_ID =
+ "mapreduce.job.application.attempt.id";
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -152,6 +152,33 @@ public abstract class OutputCommitter
* is a bridge between the two.
*/
@Override
+ public boolean isRecoverySupported() {
+ return false;
+ }
+
+ /**
+ * Recover the task output.
+ *
+ * The retry-count for the job will be passed via the
+ * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in
+ * {@link TaskAttemptContext#getConfiguration()} for the
+ * <code>OutputCommitter</code>.
+ *
+ * If an exception is thrown the task will be attempted again.
+ *
+ * @param taskContext Context of the task whose output is being recovered
+ * @throws IOException
+ */
+ public void recoverTask(TaskAttemptContext taskContext)
+ throws IOException {
+ }
+
+ /**
+ * This method implements the new interface by calling the old method. Note
+ * that the input types are different between the new and old apis and this
+ * is a bridge between the two.
+ */
+ @Override
public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
) throws IOException {
setupJob((JobContext) jobContext);
@@ -246,4 +273,17 @@ public abstract class OutputCommitter
) throws IOException {
abortTask((TaskAttemptContext) taskContext);
}
+
+ /**
+ * This method implements the new interface by calling the old method. Note
+ * that the input types are different between the new and old apis and this
+ * is a bridge between the two.
+ */
+ @Override
+ public final
+ void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+ ) throws IOException {
+ recoverTask((TaskAttemptContext) taskContext);
+ }
+
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1182008&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+
+@SuppressWarnings("unchecked")
+public class TestFileOutputCommitter extends TestCase {
+ private static Path outDir = new Path(System.getProperty("test.build.data",
+ "/tmp"), "output");
+
+ // A random task attempt id for testing.
+ private static String attempt = "attempt_200707121733_0001_m_000000_0";
+ private static String partFile = "part-00000";
+ private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+ private Text key1 = new Text("key1");
+ private Text key2 = new Text("key2");
+ private Text val1 = new Text("val1");
+ private Text val2 = new Text("val2");
+
+
+ private void writeOutput(RecordWriter theRecordWriter,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ NullWritable nullWritable = NullWritable.get();
+
+ try {
+ theRecordWriter.write(key1, val1);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val1);
+ theRecordWriter.write(nullWritable, val2);
+ theRecordWriter.write(key2, nullWritable);
+ theRecordWriter.write(key1, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key2, val2);
+ } finally {
+ theRecordWriter.close(null);
+ }
+ }
+
+
+ public void testRecovery() throws Exception {
+ JobConf conf = new JobConf();
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ // write output
+ TextOutputFormat theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter =
+ theOutputFormat.getRecordWriter(null, conf, partFile, null);
+ writeOutput(theRecordWriter, tContext);
+
+ // do commit
+ committer.commitTask(tContext);
+ Path jobTempDir1 = new Path(outDir,
+ FileOutputCommitter.getJobAttemptBaseDirName(
+ conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
+ assertTrue((new File(jobTempDir1.toString()).exists()));
+ validateContent(jobTempDir1);
+
+
+
+ //now while running the second app attempt,
+ //recover the task output from first attempt
+ JobConf conf2 = new JobConf(conf);
+ conf2.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf2.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
+ JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID());
+ TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID);
+ FileOutputCommitter committer2 = new FileOutputCommitter();
+ committer.setupJob(jContext2);
+ Path jobTempDir2 = new Path(outDir,
+ FileOutputCommitter.getJobAttemptBaseDirName(
+ conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
+ assertTrue((new File(jobTempDir2.toString()).exists()));
+
+ tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2);
+ committer2.recoverTask(tContext2);
+ validateContent(jobTempDir2);
+
+ committer2.commitJob(jContext2);
+ validateContent(outDir);
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+ private void validateContent(Path dir) throws IOException {
+ File expectedFile = new File(new Path(dir, partFile).toString());
+ StringBuffer expectedOutput = new StringBuffer();
+ expectedOutput.append(key1).append('\t').append(val1).append("\n");
+ expectedOutput.append(val1).append("\n");
+ expectedOutput.append(val2).append("\n");
+ expectedOutput.append(key2).append("\n");
+ expectedOutput.append(key1).append("\n");
+ expectedOutput.append(key2).append('\t').append(val2).append("\n");
+ String output = slurp(expectedFile);
+ assertEquals(output, expectedOutput.toString());
+ }
+
+
+ public void testCommitter() throws Exception {
+ JobConf conf = new JobConf();
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ // write output
+ TextOutputFormat theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter =
+ theOutputFormat.getRecordWriter(null, conf, partFile, null);
+ writeOutput(theRecordWriter, tContext);
+
+ // do commit
+ committer.commitTask(tContext);
+ committer.commitJob(jContext);
+
+ // validate output
+ validateContent(outDir);
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+
+ public void testAbort() throws IOException, InterruptedException {
+ JobConf conf = new JobConf();
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // do setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ // write output
+ TextOutputFormat theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter =
+ theOutputFormat.getRecordWriter(null, conf, partFile, null);
+ writeOutput(theRecordWriter, tContext);
+
+ // do abort
+ committer.abortTask(tContext);
+ FileSystem outputFileSystem = outDir.getFileSystem(conf);
+ Path workPath = new Path(outDir,
+ committer.getTaskAttemptBaseDirName(tContext))
+ .makeQualified(outputFileSystem);
+ File expectedFile = new File(new Path(workPath, partFile)
+ .toString());
+ assertFalse("task temp dir still exists", expectedFile.exists());
+
+ committer.abortJob(jContext, JobStatus.State.FAILED);
+ expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME)
+ .toString());
+ assertFalse("job temp dir still exists", expectedFile.exists());
+ assertEquals("Output directory not empty", 0, new File(outDir.toString())
+ .listFiles().length);
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+ public static class FakeFileSystem extends RawLocalFileSystem {
+ public FakeFileSystem() {
+ super();
+ }
+
+ public URI getUri() {
+ return URI.create("faildel:///");
+ }
+
+ @Override
+ public boolean delete(Path p, boolean recursive) throws IOException {
+ throw new IOException("fake delete failed");
+ }
+ }
+
+
+ public void testFailAbort() throws IOException, InterruptedException {
+ JobConf conf = new JobConf();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "faildel:///");
+ conf.setClass("fs.faildel.impl", FakeFileSystem.class, FileSystem.class);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ conf.setInt(MRConstants.APPLICATION_ATTEMPT_ID, 1);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // do setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ // write output
+ File jobTmpDir = new File(new Path(outDir,
+ FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+ conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0) +
+ Path.SEPARATOR +
+ FileOutputCommitter.TEMP_DIR_NAME).toString());
+ File taskTmpDir = new File(jobTmpDir, "_" + taskID);
+ File expectedFile = new File(taskTmpDir, partFile);
+ TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+ RecordWriter<?, ?> theRecordWriter =
+ theOutputFormat.getRecordWriter(null, conf,
+ expectedFile.getAbsolutePath(), null);
+ writeOutput(theRecordWriter, tContext);
+
+ // do abort
+ Throwable th = null;
+ try {
+ committer.abortTask(tContext);
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull(th);
+ assertTrue(th instanceof IOException);
+ assertTrue(th.getMessage().contains("fake delete failed"));
+ assertTrue(expectedFile + " does not exists", expectedFile.exists());
+
+ th = null;
+ try {
+ committer.abortJob(jContext, JobStatus.State.FAILED);
+ } catch (IOException ie) {
+ th = ie;
+ }
+ assertNotNull(th);
+ assertTrue(th instanceof IOException);
+ assertTrue(th.getMessage().contains("fake delete failed"));
+ assertTrue("job temp dir does not exists", jobTmpDir.exists());
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
+
+ public static String slurp(File f) throws IOException {
+ int len = (int) f.length();
+ byte[] buf = new byte[len];
+ FileInputStream in = new FileInputStream(f);
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestFileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -175,7 +175,12 @@ public class TestFileOutputCommitter ext
// do setup
committer.setupJob(jContext);
committer.setupTask(tContext);
+
String file = "test.txt";
+ String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
+ File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
+ File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
+ File expectedFile = new File(taskTmpDir, file);
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
@@ -183,7 +188,7 @@ public class TestFileOutputCommitter ext
FileSystem localFs = new FakeFileSystem();
TextOutputFormat theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(localFs,
- job, file, reporter);
+ job, expectedFile.getAbsolutePath(), reporter);
writeOutput(theRecordWriter, reporter);
// do abort
@@ -196,10 +201,6 @@ public class TestFileOutputCommitter ext
assertNotNull(th);
assertTrue(th instanceof IOException);
assertTrue(th.getMessage().contains("fake delete failed"));
- File jobTmpDir = new File(new Path(outDir,
- FileOutputCommitter.TEMP_DIR_NAME).toString());
- File taskTmpDir = new File(jobTmpDir, "_" + taskID);
- File expectedFile = new File(taskTmpDir, file);
assertTrue(expectedFile + " does not exists", expectedFile.exists());
th = null;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1182008&r1=1182007&r2=1182008&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Tue Oct 11 18:27:51 2011
@@ -188,9 +188,9 @@ public class TestFileOutputCommitter ext
assertNotNull(th);
assertTrue(th instanceof IOException);
assertTrue(th.getMessage().contains("fake delete failed"));
- String filename = committer.getTaskAttemptBaseDirName(tContext);
+ String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext);
File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext));
- File taskTmpDir = new File(outDir.toString(), filename);
+ File taskTmpDir = new File(outDir.toString(), taskBaseDirName);
File expectedFile = new File(taskTmpDir, partFile);
assertTrue(expectedFile + " does not exists", expectedFile.exists());