You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 07:02:00 UTC
svn commit: r1079265 - in
/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop:
mapred/FileOutputFormat.java mapred/Task.java mapred/UberTask.java
mapreduce/lib/output/FileOutputFormat.java
Author: omalley
Date: Tue Mar 8 06:02:00 2011
New Revision: 1079265
URL: http://svn.apache.org/viewvc?rev=1079265&view=rev
Log:
commit 84cc77c5b16cf8e6261474422702ae200b7e5c6c
Author: Greg Roelofs <ro...@yahoo-inc.com>
Date: Wed Feb 16 14:27:02 2011 -0800
Correct fix for output-commit-related bugs: use subtasks' IDs in context,
and always do task commit (not just for map-only jobs). v15-delta patch.
Modified:
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Tue Mar 8 06:02:00 2011
@@ -265,9 +265,9 @@ public abstract class FileOutputFormat<K
* different tasks for the job, the names for different tasks will not collide
* with each other.</p>
*
- * <p>The given name is postfixed with the task type, 'm' for maps, 'r' for
- * reduces and the task partition number. For example, give a name 'test'
- * running on the first map o the job the generated name will be
+ * <p>The given name is postfixed with the task type ('m' for maps, 'r' for
+ * reduces) and the task partition number. For example, given a name 'test'
+ * running on the first map of the job, the generated name will be
* 'test-m-00000'.</p>
*
* @param conf the configuration for the job.
@@ -278,10 +278,10 @@ public abstract class FileOutputFormat<K
int partition = conf.getInt(JobContext.TASK_PARTITION, -1);
if (partition == -1) {
throw new IllegalArgumentException(
- "This method can only be called from within a Job");
+ "This method can be called only from within a Job");
}
- String taskType = (conf.getBoolean(JobContext.TASK_ISMAP, true)) ? "m" : "r";
+ String taskType = conf.getBoolean(JobContext.TASK_ISMAP, true) ? "m" : "r";
NumberFormat numberFormat = NumberFormat.getInstance();
numberFormat.setMinimumIntegerDigits(5);
@@ -296,9 +296,9 @@ public abstract class FileOutputFormat<K
*
* <p>The path can be used to create custom files from within the map and
* reduce tasks. The path name will be unique for each task. The path parent
- * will be the job output directory.</p>ls
+ * will be the job output directory.</p>
*
- * <p>This method uses the {@link #getUniqueName} method to make the file name
+ * <p>This method uses the {@link #getUniqueName} method to make the filename
* unique for the task.</p>
*
* @param conf the configuration for the job.
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 06:02:00 2011
@@ -498,13 +498,12 @@ abstract public class Task implements Wr
ClassNotFoundException,
InterruptedException {
jobContext = new JobContextImpl(job, id, reporter);
- // taskIdForUmbilical is required here since it ultimately determines the
- // name of the pre-commit HDFS working directory (_temporary/_attempt_xxx),
- // and anything put in an "_m_" directory will fail to get saved (i.e.,
- // moved up two levels) in UberTask mode. This mostly (only?) affects
- // users of HadoopArchives (har) and IndexUpdateOutputFormat (via the
- // getWorkOutputPath() method in FileOutputFormat).
- taskContext = new TaskAttemptContextImpl(job, taskIdForUmbilical, reporter);
+ // taskId (rather than taskIdForUmbilical) is required here to avoid
+ // collisions in uber-subtasks' outputs; it ultimately determines both the
+ // name of the pre-commit HDFS working directory (_temporary/_attempt_xxx)
+ // and the (main) filename in that directory (part-[rm]-nnnnn). UberTask
+ // will handle any tempdir/commit-related details.
+ taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
if (getState() == TaskStatus.State.UNASSIGNED) {
setState(TaskStatus.State.RUNNING);
}
@@ -1065,19 +1064,19 @@ abstract public class Task implements Wr
commit(umbilical, reporter);
}
- // this is protected (rather than private) solely for UberTask map-only case
+ // this is protected (rather than private) solely for UberTask
protected void commit(TaskUmbilicalProtocol umbilical,
TaskReporter reporter) throws IOException {
try {
LOG.info("Task " + taskId + " is allowed to commit now");
committer.commitTask(taskContext);
return;
- } catch (IOException iee) {
+ } catch (IOException ioe) {
LOG.warn("Failure committing: " +
- StringUtils.stringifyException(iee));
+ StringUtils.stringifyException(ioe));
// if it couldn't commit successfully then delete the output
discardOutput(taskContext);
- throw iee;
+ throw ioe;
}
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/UberTask.java Tue Mar 8 06:02:00 2011
@@ -273,7 +273,7 @@ class UberTask extends Task {
JobConf localConf = new JobConf(job);
map.localizeConfiguration(localConf);
map.setConf(localConf);
- // for reporting purposes (to TT), use our (uber) task ID, not subtask's:
+ // for reporting purposes (to TT), use uber's task ID, not subtask's:
map.setTaskIdForUmbilical(getTaskID());
// override MapTask's "root" Progress node with our second-level one...
@@ -303,31 +303,24 @@ class UberTask extends Task {
// no one calls phase() on parent Progress (or get()?) in interim.
map.getProgress().complete();
- if (numReduceTasks == 0) {
- // For map-only jobs, we need to save (commit) each map's output, which
- // usually entails asking the TT for permission (in case of speculation)
- // and then moving it up two subdirectory levels in HDFS (i.e., out of
- // _temporary/attempt_xxx). However, the TT gives permission only if
- // the JT sent a commitAction for the task, which it hasn't yet done
- // for UberTask and which it will never do for uber-subtasks of which
- // it knows nothing. Therefore we just do the two-subdir thing (and
- // make sure elsewhere that speculation is never on for UberTasks).
- // Use UberTask's reporter so we set the progressFlag to which the
- // communication thread is paying attention; it has no knowledge of
- // subReporter.
- map.commit(umbilical, reporter);
- } else {
- // For map+reduce or reduce-only jobs, we merely need to signal the
- // communication thread to pass any progress on up to the TT. This
- // and the renameMapOutputForReduce() optimization below are the sole
- // bits of the commit() method that we actually want/need.
- reporter.progress();
- }
+ // Even for M+R jobs, we need to save (commit) each map's output (since
+ // user may create save-worthy side-files in the work/tempdir), which
+ // usually entails asking the TT for permission (because of speculation)
+ // and then moving it up one subdirectory level in HDFS (i.e., out of
+ // _temporary/_attempt_xxx). However, the TT gives permission only if
+ // the JT sent a commitAction for the task, which it hasn't yet done
+ // for UberTask and which it will never do for uber-subtasks of which
+ // it knows nothing. Therefore we just do the two-subdir thing and
+ // make sure elsewhere that speculation is never on for UberTasks.
+ // Use UberTask's reporter so we set the progressFlag to which the
+ // communication thread is paying attention; it has no knowledge of
+ // subReporter.
+ map.commit(umbilical, reporter); // includes "reporter.progress()"
// Every map will produce "file.out" in the same (local, not HDFS!) dir,
// so rename to "map_#.out" as we go. (Longer-term, will use
// TaskAttemptIDs as part of name => avoid rename.) Note that this has
- // nothing to do with the _temporary/attempt_xxx _HDFS_ subdir above!
+ // nothing to do with the _temporary/_attempt_xxx _HDFS_ subdir above!
if (numReduceTasks > 0) {
renameMapOutputForReduce(mapIds[j], map.getMapOutputFile());
}
@@ -372,7 +365,6 @@ class UberTask extends Task {
// note that this is implicitly the "isLocal" branch of ReduceTask run():
// we don't have a shuffle phase
- // (=> should skip adding Phase in first place and use 50/50 split? FIXME)
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter =
Merger.merge(job, rfs, job.getMapOutputKeyClass(),
@@ -405,6 +397,10 @@ class UberTask extends Task {
reduce.getProgress().complete();
// signal the communication thread to pass any progress on up to the TT
+ // [There's no explicit reduce.commit() because we're reusing ubertask's
+ // ID and temp dir => ubertask's commit() will take care of us. But if
+ // we ever support more than one reduce, we'll have to do explicit sub-
+ // commit() as with maps above.]
reporter.progress();
}
Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java?rev=1079265&r1=1079264&r2=1079265&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java Tue Mar 8 06:02:00 2011
@@ -180,25 +180,26 @@ public static final String OUTDIR = "map
* unique names per task-attempt (e.g. using the attemptid, say
* <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p>
*
- * <p>To get around this the Map-Reduce framework helps the application-writer
+ * <p>To get around this the Map-Reduce framework helps the application-writer
* out by maintaining a special
* <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt>
* sub-directory for each task-attempt on HDFS where the output of the
* task-attempt goes. On successful completion of the task-attempt the files
- * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt> (only)
- * are <i>promoted</i> to <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the
+ * in the <tt>${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid}</tt>
+ * (only) are <i>promoted</i> to
+ * <tt>${mapreduce.output.fileoutputformat.outputdir}</tt>. Of course, the
* framework discards the sub-directory of unsuccessful task-attempts. This
* is completely transparent to the application.</p>
*
* <p>The application-writer can take advantage of this by creating any
* side-files required in a work directory during execution
- * of his task i.e. via
+ * of his task, i.e., via
* {@link #getWorkOutputPath(TaskInputOutputContext)}, and
* the framework will move them out similarly - thus she doesn't have to pick
* unique paths per task-attempt.</p>
*
* <p>The entire discussion holds true for maps of jobs with
- * reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
+ * reducer=NONE (i.e., 0 reduces) since output of the map, in that case,
* goes directly to HDFS.</p>
*
* @return the {@link Path} to the task's temporary output directory
@@ -223,10 +224,14 @@ public static final String OUTDIR = "map
* <p>This method uses the {@link #getUniqueFile} method to make the file name
* unique for the task.</p>
*
+ * <p>See also {@link #getDefaultWorkFile}.</p>
+ *
* @param context the context for the task.
* @param name the name for the file.
* @param extension the extension for the file
* @return a unique path accross all tasks of the job.
+ * @return a full path, unique across all tasks of the job; e.g.,
+ * ${output}/_temporary/_${taskid}/${name}-[mrsct]-${partition}${ext}
*/
public
static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context,
@@ -238,11 +243,12 @@ public static final String OUTDIR = "map
}
/**
- * Generate a unique filename, based on the task id, name, and extension
+ * Generate a unique filename, based on the base name, task id (both task
+ * type and partition), and optional extension.
* @param context the task that is calling this
* @param name the base filename
* @param extension the filename extension
- * @return a string like $name-[mrsct]-$id$extension
+ * @return a string like ${name}-[mrsct]-${partition}${extension}
*/
public synchronized static String getUniqueFile(TaskAttemptContext context,
String name,
@@ -261,10 +267,13 @@ public static final String OUTDIR = "map
}
/**
- * Get the default path and filename for the output format.
+ * Get the default working path and filename for the output format.
+ *
+ * <p>See also {@link #getPathForWorkFile}.</p>
+ *
* @param context the task context
* @param extension an extension to add to the filename
- * @return a full path $output/_temporary/$taskid/part-[mr]-$id
+ * @return a full path ${output}/_temporary/_${taskid}/part-[mr]-${part}${ext}
* @throws IOException
*/
public Path getDefaultWorkFile(TaskAttemptContext context,