You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 07:02:00 UTC

svn commit: r1079265 - in /hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop: mapred/FileOutputFormat.java mapred/Task.java mapred/UberTask.java mapreduce/lib/output/FileOutputFormat.java

Author: omalley
Date: Tue Mar  8 06:02:00 2011
New Revision: 1079265

URL: http://svn.apache.org/viewvc?rev=1079265&view=rev
Log:
commit 84cc77c5b16cf8e6261474422702ae200b7e5c6c
Author: Greg Roelofs <ro...@yahoo-inc.com>
Date:   Wed Feb 16 14:27:02 2011 -0800

    Correct fix for output-commit-related bugs:  use subtasks' IDs in context,
    and always do task commit (not just for map-only jobs).  v15-delta patch.

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
    hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Tue Mar  8 06:02:00 2011
@@ -265,9 +265,9 @@ public abstract class FileOutputFormat<K
    * different tasks for the job, the names for different tasks will not collide
    * with each other.</p>
    *
-   * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
-   * reduces and the task partition number. For example, give a name 'test'
-   * running on the first map o the job the generated name will be
+   * <p>The given name is postfixed with the task type ('m' for maps, 'r' for
+   * reduces) and the task partition number. For example, given a name 'test'
+   * running on the first map of the job, the generated name will be
    * 'test-m-00000'.</p>
    *
    * @param conf the configuration for the job.
@@ -278,10 +278,10 @@ public abstract class FileOutputFormat<K
     int partition = conf.getInt(JobContext.TASK_PARTITION, -1);
     if (partition == -1) {
       throw new IllegalArgumentException(
-        "This method can only be called from within a Job");
+        "This method can be called only from within a Job");
     }
 
-    String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r";
+    String taskType = conf.getBoolean(JobContext.TASK_ISMAP, true) ? "m" : "r";
 
     NumberFormat numberFormat = NumberFormat.getInstance();
     numberFormat.setMinimumIntegerDigits(5);
@@ -296,9 +296,9 @@ public abstract class FileOutputFormat<K
    *
    * <p>The path can be used to create custom files from within the map and
    * reduce tasks. The path name will be unique for each task. The path parent
-   * will be the job output directory.</p>ls
+   * will be the job output directory.</p>
    *
-   * <p>This method uses the {@link #getUniqueName} method to make the file name
+   * <p>This method uses the {@link #getUniqueName} method to make the filename
    * unique for the task.</p>
    *
    * @param conf the configuration for the job.

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar  8 06:02:00 2011
@@ -498,13 +498,12 @@ abstract public class Task implements Wr
                                                    ClassNotFoundException,
                                                    InterruptedException {
     jobContext = new JobContextImpl(job, id, reporter);
-    // taskIdForUmbilical is required here since it ultimately determines the
-    // name of the pre-commit HDFS working directory (_temporary/_attempt_xxx),
-    // and anything put in an "_m_" directory will fail to get saved (i.e.,
-    // moved up two levels) in UberTask mode.  This mostly (only?) affects
-    // users of HadoopArchives (har) and IndexUpdateOutputFormat (via the
-    // getWorkOutputPath() method in FileOutputFormat).
-    taskContext = new TaskAttemptContextImpl(job, taskIdForUmbilical, reporter);
+    // taskId (rather than taskIdForUmbilical) is required here to avoid
+    // collisions in uber-subtasks' outputs; it ultimately determines both the
+    // name of the pre-commit HDFS working directory (_temporary/_attempt_xxx)
+    // and the (main) filename in that directory (part-[rm]-nnnnn).  UberTask
+    // will handle any tempdir/commit-related details.
+    taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
     if (getState() == TaskStatus.State.UNASSIGNED) {
       setState(TaskStatus.State.RUNNING);
     }
@@ -1065,19 +1064,19 @@ abstract public class Task implements Wr
     commit(umbilical, reporter);
   }
 
-  // this is protected (rather than private) solely for UberTask map-only case
+  // this is protected (rather than private) solely for UberTask
   protected void commit(TaskUmbilicalProtocol umbilical,
                         TaskReporter reporter) throws IOException {
     try {
       LOG.info("Task " + taskId + " is allowed to commit now");
       committer.commitTask(taskContext);
       return;
-    } catch (IOException iee) {
+    } catch (IOException ioe) {
       LOG.warn("Failure committing: " + 
-        StringUtils.stringifyException(iee));
+        StringUtils.stringifyException(ioe));
       // if it couldn't commit successfully then delete the output
       discardOutput(taskContext);
-      throw iee;
+      throw ioe;
     }
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar  8 06:02:00 2011
@@ -273,7 +273,7 @@ class UberTask extends Task {
       JobConf localConf = new JobConf(job);
       map.localizeConfiguration(localConf);
       map.setConf(localConf);
-      // for reporting purposes (to TT), use our (uber) task ID, not subtask's:
+      // for reporting purposes (to TT), use uber's task ID, not subtask's:
       map.setTaskIdForUmbilical(getTaskID());
 
       // override MapTask's "root" Progress node with our second-level one...
@@ -303,31 +303,24 @@ class UberTask extends Task {
       // no one calls phase() on parent Progress (or get()?) in interim.
       map.getProgress().complete();
 
-      if (numReduceTasks == 0) {
-        // For map-only jobs, we need to save (commit) each map's output, which
-        // usually entails asking the TT for permission (in case of speculation)
-        // and then moving it up two subdirectory levels in HDFS (i.e., out of
-        // _temporary/attempt_xxx).  However, the TT gives permission only if
-        // the JT sent a commitAction for the task, which it hasn't yet done
-        // for UberTask and which it will never do for uber-subtasks of which
-        // it knows nothing.  Therefore we just do the two-subdir thing (and
-        // make sure elsewhere that speculation is never on for UberTasks).
-        // Use UberTask's reporter so we set the progressFlag to which the
-        // communication thread is paying attention; it has no knowledge of
-        // subReporter.
-        map.commit(umbilical, reporter);
-      } else {
-        // For map+reduce or reduce-only jobs, we merely need to signal the
-        // communication thread to pass any progress on up to the TT.  This
-        // and the renameMapOutputForReduce() optimization below are the sole
-        // bits of the commit() method that we actually want/need.
-        reporter.progress();
-      }
+      // Even for M+R jobs, we need to save (commit) each map's output (since
+      // user may create save-worthy side-files in the work/tempdir), which
+      // usually entails asking the TT for permission (because of speculation)
+      // and then moving it up one subdirectory level in HDFS (i.e., out of
+      // _temporary/_attempt_xxx).  However, the TT gives permission only if
+      // the JT sent a commitAction for the task, which it hasn't yet done
+      // for UberTask and which it will never do for uber-subtasks of which
+      // it knows nothing.  Therefore we just do the two-subdir thing and
+      // make sure elsewhere that speculation is never on for UberTasks.
+      // Use UberTask's reporter so we set the progressFlag to which the
+      // communication thread is paying attention; it has no knowledge of
+      // subReporter.
+      map.commit(umbilical, reporter);  // includes "reporter.progress()"
 
       // Every map will produce "file.out" in the same (local, not HDFS!) dir,
       // so rename to "map_#.out" as we go.  (Longer-term, will use
       // TaskAttemptIDs as part of name => avoid rename.)  Note that this has
-      // nothing to do with the _temporary/attempt_xxx _HDFS_ subdir above!
+      // nothing to do with the _temporary/_attempt_xxx _HDFS_ subdir above!
       if (numReduceTasks > 0) {
         renameMapOutputForReduce(mapIds[j], map.getMapOutputFile());
       }
@@ -372,7 +365,6 @@ class UberTask extends Task {
 
     // note that this is implicitly the "isLocal" branch of ReduceTask run():
     // we don't have a shuffle phase
-    // (=> should skip adding Phase in first place and use 50/50 split? FIXME)
     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     RawKeyValueIterator rIter =
         Merger.merge(job, rfs, job.getMapOutputKeyClass(),
@@ -405,6 +397,10 @@ class UberTask extends Task {
     reduce.getProgress().complete();
 
     // signal the communication thread to pass any progress on up to the TT
+    // [There's no explicit reduce.commit() because we're reusing ubertask's
+    // ID and temp dir => ubertask's commit() will take care of us.  But if
+    // we ever support more than one reduce, we'll have to do explicit sub-
+    // commit() as with maps above.]
     reporter.progress();
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Tue Mar  8 06:02:00 2011
@@ -180,25 +180,26 @@ public static final String OUTDIR = "map
    * unique names per task-attempt (e.g. using the attemptid, say 
    * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
    * 
-   * <p>To get around this the Map-Reduce framework helps the application-writer 
+   * <p>To get around this the Map-Reduce framework helps the application-writer
    * out by maintaining a special 
    * <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> 
    * sub-directory for each task-attempt on HDFS where the output of the 
    * task-attempt goes. On successful completion of the task-attempt the files 
-   * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> (only) 
-   * are <i>promoted</i> to <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the 
+   * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt>
+   * (only) are <i>promoted</i> to
+   * <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the 
    * framework discards the sub-directory of unsuccessful task-attempts. This 
    * is completely transparent to the application.</p>
    * 
    * <p>The application-writer can take advantage of this by creating any 
    * side-files required in a work directory during execution 
-   * of his task i.e. via 
+   * of his task, i.e., via 
    * {@link #getWorkOutputPath(TaskInputOutputContext)}, and
    * the framework will move them out similarly - thus she doesn't have to pick 
    * unique paths per task-attempt.</p>
    * 
    * <p>The entire discussion holds true for maps of jobs with 
-   * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 
+   * reducer=NONE (i.e., 0 reduces) since output of the map, in that case, 
    * goes directly to HDFS.</p> 
    * 
    * @return the {@link Path} to the task's temporary output directory 
@@ -223,10 +224,14 @@ public static final String OUTDIR = "map
    * <p>This method uses the {@link #getUniqueFile} method to make the file name
    * unique for the task.</p>
    *
+   * <p>See also {@link #getDefaultWorkFile}.</p>
+   *
    * @param context the context for the task.
    * @param name the name for the file.
    * @param extension the extension for the file
    * @return a unique path accross all tasks of the job.
+   * @return a full path, unique across all tasks of the job; e.g.,
+   *         ${output}/_temporary/_${taskid}/${name}-[mrsct]-${partition}${ext}
    */
   public 
   static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, 
@@ -238,11 +243,12 @@ public static final String OUTDIR = "map
   }
 
   /**
-   * Generate a unique filename, based on the task id, name, and extension
+   * Generate a unique filename, based on the base name, task id (both task
+   * type and partition), and optional extension.
    * @param context the task that is calling this
    * @param name the base filename
    * @param extension the filename extension
-   * @return a string like $name-[mrsct]-$id$extension
+   * @return a string like ${name}-[mrsct]-${partition}${extension}
    */
   public synchronized static String getUniqueFile(TaskAttemptContext context,
                                                   String name,
@@ -261,10 +267,13 @@ public static final String OUTDIR = "map
   }
 
   /**
-   * Get the default path and filename for the output format.
+   * Get the default working path and filename for the output format.
+   *
+   * <p>See also {@link #getPathForWorkFile}.</p>
+   *
    * @param context the task context
    * @param extension an extension to add to the filename
-   * @return a full path $output/_temporary/$taskid/part-[mr]-$id
+   * @return a full path ${output}/_temporary/_${taskid}/part-[mr]-${part}${ext}
    * @throws IOException
    */
   public Path getDefaultWorkFile(TaskAttemptContext context,