You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/06/21 00:34:05 UTC
svn commit: r670086 - in /hadoop/core/trunk: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Author: acmurthy
Date: Fri Jun 20 15:34:04 2008
New Revision: 670086
URL: http://svn.apache.org/viewvc?rev=670086&view=rev
Log:
HADOOP-3598. Ensure that temporary task-output directories are not created if they are not necessary e.g. for Maps with no side-effect files.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 20 15:34:04 2008
@@ -102,6 +102,10 @@
HADOOP-3512. Separate out the tools into a tools jar. (omalley)
+ HADOOP-3598. Ensure that temporary task-output directories are not created
+ if they are not necessary e.g. for Maps with no side-effect files.
+ (acmurthy)
+
NEW FEATURES
HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -195,5 +195,40 @@
return name == null ? null: new Path(name);
}
+ /**
+ * Helper function to create the task's temporary output directory and
+ * return the path to the task's output file.
+ *
+ * @param conf job-configuration
+ * @param name temporary task-output filename
+ * @return path to the task's temporary output file
+ * @throws IOException
+ */
+ protected static Path getTaskOutputPath(JobConf conf, String name)
+ throws IOException {
+ // ${mapred.job.dir}
+ Path outputPath = getOutputPath(conf);
+ if (outputPath == null) {
+ throw new IOException("Undefined job output-path");
+ }
+
+ // ${mapred.out.dir}/_temporary
+ Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
+ FileSystem fs = jobTmpDir.getFileSystem(conf);
+ if (!fs.exists(jobTmpDir)) {
+ throw new IOException("The temporary job-output directory " +
+ jobTmpDir.toString() + " doesn't exist!");
+ }
+
+ // ${mapred.out.dir}/_temporary/_${taskid}
+ Path taskTmpDir = getWorkOutputPath(conf);
+ if (!fs.mkdirs(taskTmpDir)) {
+ throw new IOException("Mkdirs failed to create "
+ + taskTmpDir.toString());
+ }
+
+ // ${mapred.out.dir}/_temporary/_${taskid}/${name}
+ return new Path(taskTmpDir, name);
+ }
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -42,14 +42,10 @@
public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
-
- Path outputPath = getWorkOutputPath(job);
- FileSystem fs = outputPath.getFileSystem(job);
- if (!fs.exists(outputPath)) {
- throw new IOException("Output directory doesnt exist");
- }
- Path file = new Path(outputPath, name);
+ // get the path of the temporary output file
+ Path file = FileOutputFormat.getTaskOutputPath(job, name);
+ FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -130,13 +130,10 @@
getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
-
- Path outputPath = getWorkOutputPath(job);
- FileSystem fs = outputPath.getFileSystem(job);
- if (!fs.exists(outputPath)) {
- throw new IOException("Output directory doesnt exist");
- }
- Path file = new Path(outputPath, name);
+ // get the path of the temporary output file
+ Path file = FileOutputFormat.getTaskOutputPath(job, name);
+
+ FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -39,13 +39,10 @@
FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
-
- Path outputPath = getWorkOutputPath(job);
- FileSystem fs = outputPath.getFileSystem(job);
- if (!fs.exists(outputPath)) {
- throw new IOException("Output directory doesnt exist");
- }
- Path file = new Path(outputPath, name);
+ // get the path of the temporary output file
+ Path file = FileOutputFormat.getTaskOutputPath(job, name);
+
+ FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Jun 20 15:34:04 2008
@@ -493,6 +493,9 @@
> 0) {
shouldBePromoted = true;
}
+ } else {
+ LOG.info(getTaskID() + ": No outputs to promote from " +
+ taskOutputPath);
}
}
} catch (IOException ioe) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Jun 20 15:34:04 2008
@@ -1458,22 +1458,6 @@
localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
- // create _taskid directory in output path temporary directory.
- Path outputPath = FileOutputFormat.getOutputPath(localJobConf);
- if (outputPath != null) {
- Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
- FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
- if (fs.exists(jobTmpDir)) {
- Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskID());
- if (!fs.mkdirs(taskTmpDir)) {
- throw new IOException("Mkdirs failed to create "
- + taskTmpDir.toString());
- }
- } else {
- throw new IOException("The directory " + jobTmpDir.toString()
- + " doesnt exist ");
- }
- }
task.localizeConfiguration(localJobConf);
List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -108,16 +108,13 @@
String name,
Progressable progress)
throws IOException {
-
- String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
- Path dir = getWorkOutputPath(job);
- FileSystem fs = dir.getFileSystem(job);
- if (!fs.exists(dir)) {
- throw new IOException("Output directory doesnt exist");
- }
boolean isCompressed = getCompressOutput(job);
+ String keyValueSeparator = job.get("mapred.textoutputformat.separator",
+ "\t");
if (!isCompressed) {
- FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+ Path file = FileOutputFormat.getTaskOutputPath(job, name);
+ FileSystem fs = file.getFileSystem(job);
+ FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
Class<? extends CompressionCodec> codecClass =
@@ -126,8 +123,11 @@
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, job);
// build the filename including the extension
- Path filename = new Path(dir, name + codec.getDefaultExtension());
- FSDataOutputStream fileOut = fs.create(filename, progress);
+ Path file =
+ FileOutputFormat.getTaskOutputPath(job,
+ name + codec.getDefaultExtension());
+ FileSystem fs = file.getFileSystem(job);
+ FSDataOutputStream fileOut = fs.create(file, progress);
return new LineRecordWriter<K, V>(new DataOutputStream
(codec.createOutputStream(fileOut)),
keyValueSeparator);
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -39,7 +39,10 @@
}
private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ new Path(new Path(
+ new Path(System.getProperty("test.build.data", "."),
+ "data"),
+ MRConstants.TEMP_DIR_NAME),
"TestMultipleTextOutputFormat");
private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
@@ -81,6 +84,7 @@
public void testFormat() throws Exception {
JobConf job = new JobConf();
+ FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
if (!fs.mkdirs(workDir)) {
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -38,7 +38,11 @@
public void testBinary() throws IOException {
JobConf job = new JobConf();
FileSystem fs = FileSystem.getLocal(job);
- Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+
+ Path dir =
+ new Path(new Path(new Path(System.getProperty("test.build.data",".")),
+ MRConstants.TEMP_DIR_NAME),
+ "mapred");
Path file = new Path(dir, "testbinary.seq");
Random r = new Random();
long seed = r.nextLong();
@@ -49,6 +53,7 @@
fail("Failed to create output directory");
}
+ FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, dir);
SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job,
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -36,13 +36,17 @@
}
}
- private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ private static Path workDir =
+ new Path(new Path(
+ new Path(System.getProperty("test.build.data", "."),
+ "data"),
+ MRConstants.TEMP_DIR_NAME),
"TestTextOutputFormat");
@SuppressWarnings("unchecked")
public void testFormat() throws Exception {
JobConf job = new JobConf();
+ FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
if (!fs.mkdirs(workDir)) {
@@ -94,6 +98,7 @@
JobConf job = new JobConf();
String separator = "\u0001";
job.set("mapred.textoutputformat.separator", separator);
+ FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, workDir);
FileSystem fs = workDir.getFileSystem(job);
if (!fs.mkdirs(workDir)) {