You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/14 03:16:31 UTC
svn commit: r1183185 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoo...
Author: acmurthy
Date: Fri Oct 14 01:16:30 2011
New Revision: 1183185
URL: http://svn.apache.org/viewvc?rev=1183185&view=rev
Log:
MAPREDUCE-3170. Fixed job output commit for deep hierarchies. Contributed by Hitesh Shah.
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Oct 14 01:16:30 2011
@@ -1608,6 +1608,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2789. Complete schedulingInfo on CLI. (Eric Payne via acmurthy)
+ MAPREDUCE-3170. Fixed job output commit for deep hierarchies. (Hitesh Shah
+ via acmurthy)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -71,27 +71,30 @@ public class FileOutputCommitter extends
//delete the task temp directory from the current jobtempdir
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
- FileSystem outputFileSystem = outputPath.getFileSystem(conf);
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- } else {
- LOG.warn("Task temp dir could not be deleted " + tmpDir);
- }
-
- //move the job output to final place
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
-
- // delete the _temporary folder in the output folder
- cleanupJob(context);
- // check if the output-dir marking is required
- if (shouldMarkOutputDir(context.getJobConf())) {
- // create a _success file in the output folder
- markOutputDirSuccessful(context);
+ if (outputPath != null) {
+ FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+ Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+ Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+ if (fileSys.exists(tmpDir)) {
+ fileSys.delete(tmpDir, true);
+ } else {
+ LOG.warn("Task temp dir could not be deleted " + tmpDir);
+ }
+
+ //move the job output to final place
+ Path jobOutputPath =
+ new Path(outputPath, getJobAttemptBaseDirName(context));
+ moveJobOutputs(outputFileSystem,
+ jobOutputPath, outputPath, jobOutputPath);
+
+ // delete the _temporary folder in the output folder
+ cleanupJob(context);
+ // check if the output-dir marking is required
+ if (shouldMarkOutputDir(context.getJobConf())) {
+ // create a _success file in the output folder
+ markOutputDirSuccessful(context);
+ }
}
}
@@ -109,10 +112,14 @@ public class FileOutputCommitter extends
}
}
- private void moveJobOutputs(FileSystem fs,
+ private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
Path finalOutputDir, Path jobOutput) throws IOException {
+ LOG.debug("Told to move job output from " + jobOutput
+ + " to " + finalOutputDir +
+ " and orig job output path is " + origJobOutputPath);
if (fs.isFile(jobOutput)) {
- Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+ Path finalOutputPath =
+ getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
if (!fs.rename(jobOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Failed to delete earlier output of job");
@@ -121,18 +128,23 @@ public class FileOutputCommitter extends
throw new IOException("Failed to save output of job");
}
}
- LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+ LOG.debug("Moved job output file from " + jobOutput + " to " +
+ finalOutputPath);
} else if (fs.getFileStatus(jobOutput).isDirectory()) {
+ LOG.debug("Job output file " + jobOutput + " is a dir");
FileStatus[] paths = fs.listStatus(jobOutput);
- Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+ Path finalOutputPath =
+ getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
fs.mkdirs(finalOutputPath);
+ LOG.debug("Creating dirs along job output path " + finalOutputPath);
if (paths != null) {
for (FileStatus path : paths) {
- moveJobOutputs(fs, finalOutputDir, path.getPath());
+ moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
}
}
}
}
+
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
@@ -199,8 +211,10 @@ public class FileOutputCommitter extends
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
context.getProgressible().progress();
+ LOG.debug("Told to move taskoutput from " + taskOutput
+ + " to " + jobOutputDir);
if (fs.isFile(taskOutput)) {
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
+ Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
getTempTaskOutputPath(context));
if (!fs.rename(taskOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
@@ -214,10 +228,12 @@ public class FileOutputCommitter extends
}
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
} else if(fs.getFileStatus(taskOutput).isDirectory()) {
+ LOG.debug("Taskoutput " + taskOutput + " is a dir");
FileStatus[] paths = fs.listStatus(taskOutput);
- Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
+ Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
getTempTaskOutputPath(context));
fs.mkdirs(finalOutputPath);
+ LOG.debug("Creating dirs along path " + finalOutputPath);
if (paths != null) {
for (FileStatus path : paths) {
moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@@ -235,14 +251,16 @@ public class FileOutputCommitter extends
}
}
- private Path getFinalPath(Path jobOutputDir, Path taskOutput,
+ @SuppressWarnings("deprecation")
+ private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput,
Path taskOutputPath) throws IOException {
- URI taskOutputUri = taskOutput.toUri();
- URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+ URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
+ URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
+ URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
if (taskOutputUri == relativePath) {
//taskOutputPath is not a parent of taskOutput
throw new IOException("Can not get the relative path: base = " +
- taskOutputPath + " child = " + taskOutput);
+ taskOutputPathUri + " child = " + taskOutputUri);
}
if (relativePath.getPath().length() > 0) {
return new Path(jobOutputDir, relativePath.getPath());
@@ -325,7 +343,10 @@ public class FileOutputCommitter extends
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
if (outputFileSystem.exists(pathToRecover)) {
// Move the task outputs to their final place
- moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+ LOG.debug("Trying to recover task from " + pathToRecover
+ + " into " + jobOutputPath);
+ moveJobOutputs(outputFileSystem,
+ pathToRecover, jobOutputPath, pathToRecover);
LOG.info("Saved output of job to " + jobOutputPath);
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -111,32 +111,48 @@ public class FileOutputCommitter extends
* @param context the job's context
*/
public void commitJob(JobContext context) throws IOException {
- //delete the task temp directory from the current jobtempdir
- Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
- Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
- FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
- if (fileSys.exists(tmpDir)) {
- fileSys.delete(tmpDir, true);
- } else {
- LOG.warn("Task temp dir could not be deleted " + tmpDir);
- }
-
- //move the job output to final place
- Path jobOutputPath =
- new Path(outputPath, getJobAttemptBaseDirName(context));
- moveJobOutputs(outputFileSystem, outputPath, jobOutputPath);
-
- // delete the _temporary folder and create a _done file in the o/p folder
- cleanupJob(context);
- if (shouldMarkOutputDir(context.getConfiguration())) {
- markOutputDirSuccessful(context);
+ if (outputPath != null) {
+ //delete the task temp directory from the current jobtempdir
+ Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+ Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+ FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+ if (fileSys.exists(tmpDir)) {
+ fileSys.delete(tmpDir, true);
+ } else {
+ LOG.warn("Task temp dir could not be deleted " + tmpDir);
+ }
+
+ //move the job output to final place
+ Path jobOutputPath =
+ new Path(outputPath, getJobAttemptBaseDirName(context));
+ moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath);
+
+ // delete the _temporary folder and create a _done file in the o/p folder
+ cleanupJob(context);
+ if (shouldMarkOutputDir(context.getConfiguration())) {
+ markOutputDirSuccessful(context);
+ }
}
}
- private void moveJobOutputs(FileSystem fs,
+ /**
+ * Move job output to final location
+ * @param fs Filesystem handle
+ * @param origJobOutputPath The original location of the job output
+ * Required to generate the relative path for correct moving of data.
+ * @param finalOutputDir The final output directory to which the job output
+ * needs to be moved
+ * @param jobOutput The current job output directory being moved
+ * @throws IOException
+ */
+ private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
Path finalOutputDir, Path jobOutput) throws IOException {
+ LOG.debug("Told to move job output from " + jobOutput
+ + " to " + finalOutputDir +
+ " and orig job output path is " + origJobOutputPath);
if (fs.isFile(jobOutput)) {
- Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+ Path finalOutputPath =
+ getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
if (!fs.rename(jobOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Failed to delete earlier output of job");
@@ -145,14 +161,18 @@ public class FileOutputCommitter extends
throw new IOException("Failed to save output of job");
}
}
- LOG.debug("Moved " + jobOutput + " to " + finalOutputPath);
+ LOG.debug("Moved job output file from " + jobOutput + " to " +
+ finalOutputPath);
} else if (fs.getFileStatus(jobOutput).isDirectory()) {
+ LOG.debug("Job output file " + jobOutput + " is a dir");
FileStatus[] paths = fs.listStatus(jobOutput);
- Path finalOutputPath = getFinalPath(finalOutputDir, jobOutput, jobOutput);
+ Path finalOutputPath =
+ getFinalPath(finalOutputDir, jobOutput, origJobOutputPath);
fs.mkdirs(finalOutputPath);
+ LOG.debug("Creating dirs along job output path " + finalOutputPath);
if (paths != null) {
for (FileStatus path : paths) {
- moveJobOutputs(fs, finalOutputDir, path.getPath());
+ moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
}
}
}
@@ -233,6 +253,8 @@ public class FileOutputCommitter extends
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
context.progress();
+ LOG.debug("Told to move taskoutput from " + taskOutput
+ + " to " + jobOutputDir);
if (fs.isFile(taskOutput)) {
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
workPath);
@@ -248,9 +270,11 @@ public class FileOutputCommitter extends
}
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
} else if(fs.getFileStatus(taskOutput).isDirectory()) {
+ LOG.debug("Taskoutput " + taskOutput + " is a dir");
FileStatus[] paths = fs.listStatus(taskOutput);
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
fs.mkdirs(finalOutputPath);
+ LOG.debug("Creating dirs along path " + finalOutputPath);
if (paths != null) {
for (FileStatus path : paths) {
moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@@ -281,12 +305,17 @@ public class FileOutputCommitter extends
* @throws IOException
*/
private Path getFinalPath(Path jobOutputDir, Path taskOutput,
- Path taskOutputPath) throws IOException {
- URI taskOutputUri = taskOutput.toUri();
- URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
+ Path taskOutputPath) throws IOException {
+ URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(),
+ outputFileSystem.getWorkingDirectory()).toUri();
+ URI taskOutputPathUri =
+ taskOutputPath.makeQualified(
+ outputFileSystem.getUri(),
+ outputFileSystem.getWorkingDirectory()).toUri();
+ URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
if (taskOutputUri == relativePath) {
throw new IOException("Can not get the relative path: base = " +
- taskOutputPath + " child = " + taskOutput);
+ taskOutputPathUri + " child = " + taskOutputUri);
}
if (relativePath.getPath().length() > 0) {
return new Path(jobOutputDir, relativePath.getPath());
@@ -334,9 +363,12 @@ public class FileOutputCommitter extends
Path pathToRecover =
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+ LOG.debug("Trying to recover task from " + pathToRecover
+ + " into " + jobOutputPath);
if (outputFileSystem.exists(pathToRecover)) {
// Move the task outputs to their final place
- moveJobOutputs(outputFileSystem, jobOutputPath, pathToRecover);
+ moveJobOutputs(outputFileSystem,
+ pathToRecover, jobOutputPath, pathToRecover);
LOG.info("Saved output of job to " + jobOutputPath);
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -25,13 +25,17 @@ import java.net.URI;
import junit.framework.TestCase;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+
@SuppressWarnings("unchecked")
public class TestFileOutputCommitter extends TestCase {
private static Path outDir = new Path(System.getProperty("test.build.data",
@@ -65,6 +69,20 @@ public class TestFileOutputCommitter ext
}
}
+ private void writeMapFileOutput(RecordWriter theRecordWriter,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ int key = 0;
+ for (int i = 0 ; i < 10; ++i) {
+ key = i;
+ Text val = (i%2 == 1) ? val1 : val2;
+ theRecordWriter.write(new LongWritable(key),
+ val);
+ }
+ } finally {
+ theRecordWriter.close(null);
+ }
+ }
public void testRecovery() throws Exception {
JobConf conf = new JobConf();
@@ -91,9 +109,7 @@ public class TestFileOutputCommitter ext
FileOutputCommitter.getJobAttemptBaseDirName(
conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0)));
assertTrue((new File(jobTempDir1.toString()).exists()));
- validateContent(jobTempDir1);
-
-
+ validateContent(jobTempDir1);
//now while running the second app attempt,
//recover the task output from first attempt
@@ -131,6 +147,29 @@ public class TestFileOutputCommitter ext
assertEquals(output, expectedOutput.toString());
}
+ private void validateMapFileOutputContent(
+ FileSystem fs, Path dir) throws IOException {
+ // map output is a directory with index and data files
+ Path expectedMapDir = new Path(dir, partFile);
+ assert(fs.getFileStatus(expectedMapDir).isDirectory());
+ FileStatus[] files = fs.listStatus(expectedMapDir);
+ int fileCount = 0;
+ boolean dataFileFound = false;
+ boolean indexFileFound = false;
+ for (FileStatus f : files) {
+ if (f.isFile()) {
+ ++fileCount;
+ if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
+ indexFileFound = true;
+ }
+ else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
+ dataFileFound = true;
+ }
+ }
+ }
+ assert(fileCount > 0);
+ assert(dataFileFound && indexFileFound);
+ }
public void testCommitter() throws Exception {
JobConf conf = new JobConf();
@@ -159,6 +198,31 @@ public class TestFileOutputCommitter ext
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testMapFileOutputCommitter() throws Exception {
+ JobConf conf = new JobConf();
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter();
+
+ // setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ // write output
+ MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
+ RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(null, conf, partFile, null);
+ writeMapFileOutput(theRecordWriter, tContext);
+
+ // do commit
+ committer.commitTask(tContext);
+ committer.commitJob(jContext);
+
+ // validate output
+ validateMapFileOutputContent(FileSystem.get(conf), outDir);
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
public void testAbort() throws IOException, InterruptedException {
JobConf conf = new JobConf();
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1183185&r1=1183184&r2=1183185&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Fri Oct 14 01:16:30 2011
@@ -26,10 +26,13 @@ import java.net.URI;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -75,6 +78,20 @@ public class TestFileOutputCommitter ext
}
}
+ private void writeMapFileOutput(RecordWriter theRecordWriter,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ try {
+ int key = 0;
+ for (int i = 0 ; i < 10; ++i) {
+ key = i;
+ Text val = (i%2 == 1) ? val1 : val2;
+ theRecordWriter.write(new LongWritable(key),
+ val);
+ }
+ } finally {
+ theRecordWriter.close(context);
+ }
+ }
public void testRecovery() throws Exception {
Job job = Job.getInstance();
@@ -101,9 +118,7 @@ public class TestFileOutputCommitter ext
FileOutputCommitter.getJobAttemptBaseDirName(
conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0)));
assertTrue((new File(jobTempDir1.toString()).exists()));
- validateContent(jobTempDir1);
-
-
+ validateContent(jobTempDir1);
//now while running the second app attempt,
//recover the task output from first attempt
@@ -141,6 +156,29 @@ public class TestFileOutputCommitter ext
assertEquals(output, expectedOutput.toString());
}
+ private void validateMapFileOutputContent(
+ FileSystem fs, Path dir) throws IOException {
+ // map output is a directory with index and data files
+ Path expectedMapDir = new Path(dir, partFile);
+ assert(fs.getFileStatus(expectedMapDir).isDirectory());
+ FileStatus[] files = fs.listStatus(expectedMapDir);
+ int fileCount = 0;
+ boolean dataFileFound = false;
+ boolean indexFileFound = false;
+ for (FileStatus f : files) {
+ if (f.isFile()) {
+ ++fileCount;
+ if (f.getPath().getName().equals(MapFile.INDEX_FILE_NAME)) {
+ indexFileFound = true;
+ }
+ else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
+ dataFileFound = true;
+ }
+ }
+ }
+ assert(fileCount > 0);
+ assert(dataFileFound && indexFileFound);
+ }
public void testCommitter() throws Exception {
Job job = Job.getInstance();
@@ -169,6 +207,32 @@ public class TestFileOutputCommitter ext
FileUtil.fullyDelete(new File(outDir.toString()));
}
+ public void testMapFileOutputCommitter() throws Exception {
+ Job job = Job.getInstance();
+ FileOutputFormat.setOutputPath(job, outDir);
+ Configuration conf = job.getConfiguration();
+ conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+ JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
+ FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
+
+ // setup
+ committer.setupJob(jContext);
+ committer.setupTask(tContext);
+
+ // write output
+ MapFileOutputFormat theOutputFormat = new MapFileOutputFormat();
+ RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
+ writeMapFileOutput(theRecordWriter, tContext);
+
+ // do commit
+ committer.commitTask(tContext);
+ committer.commitJob(jContext);
+
+ // validate output
+ validateMapFileOutputContent(FileSystem.get(job.getConfiguration()), outDir);
+ FileUtil.fullyDelete(new File(outDir.toString()));
+ }
public void testAbort() throws IOException, InterruptedException {
Job job = Job.getInstance();