You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/06/21 00:34:05 UTC

svn commit: r670086 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: acmurthy
Date: Fri Jun 20 15:34:04 2008
New Revision: 670086

URL: http://svn.apache.org/viewvc?rev=670086&view=rev
Log:
HADOOP-3598. Ensure that temporary task-output directories are not created if they are not necessary e.g. for Maps with no side-effect files.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jun 20 15:34:04 2008
@@ -102,6 +102,10 @@
 
     HADOOP-3512. Separate out the tools into a tools jar. (omalley)
 
+    HADOOP-3598. Ensure that temporary task-output directories are not created
+    if they are not necessary e.g. for Maps with no side-effect files.
+    (acmurthy)
+
   NEW FEATURES
 
     HADOOP-3074. Provides a UrlStreamHandler for DFS and other FS,

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -195,5 +195,40 @@
     return name == null ? null: new Path(name);
   }
 
+  /**
+   * Helper function to create the task's temporary output directory and 
+   * return the path to the task's output file.
+   * 
+   * @param conf job-configuration
+   * @param name temporary task-output filename
+   * @return path to the task's temporary output file
+   * @throws IOException
+   */
+  protected static Path getTaskOutputPath(JobConf conf, String name) 
+  throws IOException {
+    // ${mapred.job.dir}
+    Path outputPath = getOutputPath(conf);
+    if (outputPath == null) {
+      throw new IOException("Undefined job output-path");
+    }
+
+    // ${mapred.out.dir}/_temporary
+    Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
+    FileSystem fs = jobTmpDir.getFileSystem(conf);
+    if (!fs.exists(jobTmpDir)) {
+      throw new IOException("The temporary job-output directory " + 
+          jobTmpDir.toString() + " doesn't exist!"); 
+    }
+
+    // ${mapred.out.dir}/_temporary/_${taskid}
+    Path taskTmpDir = getWorkOutputPath(conf);
+    if (!fs.mkdirs(taskTmpDir)) {
+      throw new IOException("Mkdirs failed to create " 
+          + taskTmpDir.toString());
+    }
+    
+    // ${mapred.out.dir}/_temporary/_${taskid}/${name}
+    return new Path(taskTmpDir, name);
+  } 
 }
 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/MapFileOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -42,14 +42,10 @@
   public RecordWriter<WritableComparable, Writable> getRecordWriter(FileSystem ignored, JobConf job,
                                       String name, Progressable progress)
     throws IOException {
-
-    Path outputPath = getWorkOutputPath(job);
-    FileSystem fs = outputPath.getFileSystem(job);
-    if (!fs.exists(outputPath)) {
-      throw new IOException("Output directory doesnt exist");
-    }
-    Path file = new Path(outputPath, name);
+    // get the path of the temporary output file 
+    Path file = FileOutputFormat.getTaskOutputPath(job, name);
     
+    FileSystem fs = file.getFileSystem(job);
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileAsBinaryOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -130,13 +130,10 @@
              getRecordWriter(FileSystem ignored, JobConf job,
                              String name, Progressable progress)
     throws IOException {
-
-    Path outputPath = getWorkOutputPath(job);
-    FileSystem fs = outputPath.getFileSystem(job);
-    if (!fs.exists(outputPath)) {
-      throw new IOException("Output directory doesnt exist");
-    }
-    Path file = new Path(outputPath, name);
+    // get the path of the temporary output file 
+    Path file = FileOutputFormat.getTaskOutputPath(job, name);
+    
+    FileSystem fs = file.getFileSystem(job);
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -39,13 +39,10 @@
                                           FileSystem ignored, JobConf job,
                                           String name, Progressable progress)
     throws IOException {
-
-    Path outputPath = getWorkOutputPath(job);
-    FileSystem fs = outputPath.getFileSystem(job);
-    if (!fs.exists(outputPath)) {
-      throw new IOException("Output directory doesnt exist");
-    }
-    Path file = new Path(outputPath, name);
+    // get the path of the temporary output file 
+    Path file = FileOutputFormat.getTaskOutputPath(job, name);
+    
+    FileSystem fs = file.getFileSystem(job);
     CompressionCodec codec = null;
     CompressionType compressionType = CompressionType.NONE;
     if (getCompressOutput(job)) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Jun 20 15:34:04 2008
@@ -493,6 +493,9 @@
                       > 0) {
                 shouldBePromoted = true;
               }
+            } else {
+              LOG.info(getTaskID() + ": No outputs to promote from " + 
+                       taskOutputPath);
             }
           }
         } catch (IOException ioe) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Jun 20 15:34:04 2008
@@ -1458,22 +1458,6 @@
       localJobConf.set("mapred.task.id", task.getTaskID().toString());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
 
-      // create _taskid directory in output path temporary directory.
-      Path outputPath = FileOutputFormat.getOutputPath(localJobConf);
-      if (outputPath != null) {
-        Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
-        FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
-        if (fs.exists(jobTmpDir)) {
-          Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskID());
-          if (!fs.mkdirs(taskTmpDir)) {
-            throw new IOException("Mkdirs failed to create " 
-                                 + taskTmpDir.toString());
-          }
-        } else {
-          throw new IOException("The directory " + jobTmpDir.toString()
-                                 + " doesnt exist "); 
-        }
-      }
       task.localizeConfiguration(localJobConf);
       
       List<String[]> staticResolutions = NetUtils.getAllStaticResolutions();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TextOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -108,16 +108,13 @@
                                                   String name,
                                                   Progressable progress)
     throws IOException {
-
-    String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t");
-    Path dir = getWorkOutputPath(job);
-    FileSystem fs = dir.getFileSystem(job);
-    if (!fs.exists(dir)) {
-      throw new IOException("Output directory doesnt exist");
-    }
     boolean isCompressed = getCompressOutput(job);
+    String keyValueSeparator = job.get("mapred.textoutputformat.separator", 
+                                       "\t");
     if (!isCompressed) {
-      FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
+      Path file = FileOutputFormat.getTaskOutputPath(job, name);
+      FileSystem fs = file.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(file, progress);
       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
     } else {
       Class<? extends CompressionCodec> codecClass =
@@ -126,8 +123,11 @@
       CompressionCodec codec = (CompressionCodec)
         ReflectionUtils.newInstance(codecClass, job);
       // build the filename including the extension
-      Path filename = new Path(dir, name + codec.getDefaultExtension());
-      FSDataOutputStream fileOut = fs.create(filename, progress);
+      Path file = 
+        FileOutputFormat.getTaskOutputPath(job, 
+                                           name + codec.getDefaultExtension());
+      FileSystem fs = file.getFileSystem(job);
+      FSDataOutputStream fileOut = fs.create(file, progress);
       return new LineRecordWriter<K, V>(new DataOutputStream
                                         (codec.createOutputStream(fileOut)),
                                         keyValueSeparator);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -39,7 +39,10 @@
   }
 
   private static Path workDir = 
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"), 
+    new Path(new Path(
+                      new Path(System.getProperty("test.build.data", "."), 
+                               "data"), 
+                      MRConstants.TEMP_DIR_NAME), 
              "TestMultipleTextOutputFormat");
 
   private static void writeData(RecordWriter<Text, Text> rw) throws IOException {
@@ -81,6 +84,7 @@
   
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
+    FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);
     if (!fs.mkdirs(workDir)) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -38,7 +38,11 @@
   public void testBinary() throws IOException {
     JobConf job = new JobConf();
     FileSystem fs = FileSystem.getLocal(job);
-    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    
+    Path dir = 
+      new Path(new Path(new Path(System.getProperty("test.build.data",".")), 
+                        MRConstants.TEMP_DIR_NAME),
+               "mapred");
     Path file = new Path(dir, "testbinary.seq");
     Random r = new Random();
     long seed = r.nextLong();
@@ -49,6 +53,7 @@
       fail("Failed to create output directory");
     }
 
+    FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, dir);
 
     SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=670086&r1=670085&r2=670086&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Fri Jun 20 15:34:04 2008
@@ -36,13 +36,17 @@
     }
   }
 
-  private static Path workDir =
-    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+  private static Path workDir = 
+    new Path(new Path(
+                      new Path(System.getProperty("test.build.data", "."), 
+                               "data"), 
+                      MRConstants.TEMP_DIR_NAME), 
              "TestTextOutputFormat");
 
   @SuppressWarnings("unchecked")
   public void testFormat() throws Exception {
     JobConf job = new JobConf();
+    FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);
     if (!fs.mkdirs(workDir)) {
@@ -94,6 +98,7 @@
     JobConf job = new JobConf();
     String separator = "\u0001";
     job.set("mapred.textoutputformat.separator", separator);
+    FileOutputFormat.setOutputPath(job, workDir.getParent().getParent());
     FileOutputFormat.setWorkOutputPath(job, workDir);
     FileSystem fs = workDir.getFileSystem(job);
     if (!fs.mkdirs(workDir)) {