You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/10/24 23:24:53 UTC
svn commit: r588035 [2/3] - in /lucene/hadoop/branches/branch-0.15: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/filecache/
src/java/org/apache/hadoop/io/ src/java/org/apache...
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobConf.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobConf.java Wed Oct 24 14:24:51 2007
@@ -31,6 +31,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
@@ -43,11 +45,63 @@
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Tool;
-/** A map/reduce job configuration. This names the {@link Mapper}, combiner
- * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat}, and
- * {@link OutputFormat} implementations to be used. It also indicates the set
- * of input files, and where the output files should be written. */
+/**
+ * A map/reduce job configuration.
+ *
+ * <p><code>JobConf</code> is the primary interface for a user to describe a
+ * map-reduce job to the Hadoop framework for execution. The framework tries to
+ * faithfully execute the job as-is described by <code>JobConf</code>, however:
+ * <ol>
+ * <li>
+ * Some configuration parameters might have been marked as
+ * <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
+ * final</a> by administrators and hence cannot be altered.
+ * </li>
+ * <li>
+ * While some job parameters are straight-forward to set
+ * (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly
+ * rest of the framework and/or job-configuration and is relatively more
+ * complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
+ * </li>
+ * </ol></p>
+ *
+ * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner
+ * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
+ * {@link OutputFormat} implementations to be used etc. It also indicates the
+ * set of input files ({@link #setInputPath(Path)}/{@link #addInputPath(Path)}),
+ * and where the output files should be written ({@link #setOutputPath(Path)}).
+ *
+ * <p>Optionally <code>JobConf</code> is used to specify other advanced facets
+ * of the job such as <code>Comparator</code>s to be used, files to be put in
+ * the {@link DistributedCache}, whether or not intermediate and/or job outputs
+ * are to be compressed (and how) etc.</p>
+ *
+ * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
+ * <p><blockquote><pre>
+ * // Create a new JobConf
+ * JobConf job = new JobConf(new Configuration(), MyJob.class);
+ *
+ * // Specify various job-specific parameters
+ * job.setJobName("myjob");
+ *
+ * job.setInputPath(new Path("in"));
+ * job.setOutputPath(new Path("out"));
+ *
+ * job.setMapperClass(MyJob.MyMapper.class);
+ * job.setCombinerClass(MyJob.MyReducer.class);
+ * job.setReducerClass(MyJob.MyReducer.class);
+ *
+ * job.setInputFormat(SequenceFileInputFormat.class);
+ * job.setOutputFormat(SequenceFileOutputFormat.class);
+ * </pre></blockquote></p>
+ *
+ * @see JobClient
+ * @see ClusterStatus
+ * @see Tool
+ * @see DistributedCache
+ */
public class JobConf extends Configuration {
private static final Log LOG = LogFactory.getLog(JobConf.class);
@@ -61,6 +115,7 @@
/**
* Construct a map/reduce job configuration.
+ *
* @param exampleClass a class whose containing jar is used as the job's jar.
*/
public JobConf(Class exampleClass) {
@@ -92,7 +147,7 @@
/** Construct a map/reduce configuration.
*
- * @param config a Configuration-format XML job description file
+ * @param config a Configuration-format XML job description file.
*/
public JobConf(String config) {
this(new Path(config));
@@ -100,7 +155,7 @@
/** Construct a map/reduce configuration.
*
- * @param config a Configuration-format XML job description file
+ * @param config a Configuration-format XML job description file.
*/
public JobConf(Path config) {
super();
@@ -111,6 +166,7 @@
/**
* Checks if <b>mapred-default.xml</b> is on the CLASSPATH, if so
* it warns the user and loads it as a {@link Configuration} resource.
+ *
* @deprecated Remove in hadoop-0.16.0 via HADOOP-1843
*/
private void checkWarnAndLoadMapredDefault() {
@@ -122,12 +178,24 @@
}
}
+ /**
+ * Get the user jar for the map-reduce job.
+ *
+ * @return the user jar for the map-reduce job.
+ */
public String getJar() { return get("mapred.jar"); }
+
+ /**
+ * Set the user jar for the map-reduce job.
+ *
+ * @param jar the user jar for the map-reduce job.
+ */
public void setJar(String jar) { set("mapred.jar", jar); }
/**
* Set the job's jar file by finding an example class location.
- * @param cls the example class
+ *
+ * @param cls the example class.
*/
public void setJarByClass(Class cls) {
String jar = findContainingJar(cls);
@@ -136,6 +204,11 @@
}
}
+ /**
+ * Get the system directory where job-specific files are to be placed.
+ *
+ * @return the system directory where job-specific files are to be placed.
+ */
public Path getSystemDir() {
return new Path(get("mapred.system.dir", "/tmp/hadoop/mapred/system"));
}
@@ -158,23 +231,41 @@
}
}
- /** Constructs a local file name. Files are distributed among configured
- * local directories.*/
+ /**
+ * Constructs a local file name. Files are distributed among configured
+ * local directories.
+ */
public Path getLocalPath(String pathString) throws IOException {
return getLocalPath("mapred.local.dir", pathString);
}
+ /**
+ * Set the {@link Path} of the input directory for the map-reduce job.
+ *
+ * @param dir the {@link Path} of the input directory for the map-reduce job.
+ */
public void setInputPath(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
set("mapred.input.dir", dir.toString());
}
+ /**
+ * Add a {@link Path} to the list of inputs for the map-reduce job.
+ *
+ * @param dir {@link Path} to be added to the list of inputs for
+ * the map-reduce job.
+ */
public void addInputPath(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
String dirs = get("mapred.input.dir");
set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
}
+ /**
+ * Get the list of input {@link Path}s for the map-reduce job.
+ *
+ * @return the list of input {@link Path}s for the map-reduce job.
+ */
public Path[] getInputPaths() {
String dirs = get("mapred.input.dir", "");
ArrayList list = Collections.list(new StringTokenizer(dirs, ","));
@@ -187,6 +278,7 @@
/**
* Get the reported username for this job.
+ *
* @return the username
*/
public String getUser() {
@@ -195,7 +287,8 @@
/**
* Set the reported username for this job.
- * @param user the username
+ *
+ * @param user the username for this job.
*/
public void setUser(String user) {
set("user.name", user);
@@ -206,6 +299,10 @@
/**
* Set whether the framework should keep the intermediate files for
* failed tasks.
+ *
+ * @param keep <code>true</code> if framework should keep the intermediate files
+ * for failed tasks, <code>false</code> otherwise.
+ *
*/
public void setKeepFailedTaskFiles(boolean keep) {
setBoolean("keep.failed.task.files", keep);
@@ -213,6 +310,7 @@
/**
* Should the temporary files for failed tasks be kept?
+ *
* @return should the files be kept?
*/
public boolean getKeepFailedTaskFiles() {
@@ -223,6 +321,7 @@
* Set a regular expression for task names that should be kept.
* The regular expression ".*_m_000123_0" would keep the files
* for the first instance of map 123 that ran.
+ *
* @param pattern the java.util.regex.Pattern to match against the
* task names.
*/
@@ -233,15 +332,17 @@
/**
* Get the regular expression that is matched against the task names
* to see if we need to keep the files.
- * @return the pattern as a string, if it was set, othewise null
+ *
+ * @return the pattern as a string, if it was set, othewise null.
*/
public String getKeepTaskFilesPattern() {
return get("keep.task.files.pattern");
}
/**
- * Set the current working directory for the default file system
- * @param dir the new current working directory
+ * Set the current working directory for the default file system.
+ *
+ * @param dir the new current working directory.
*/
public void setWorkingDirectory(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
@@ -250,7 +351,8 @@
/**
* Get the current working directory for the default file system.
- * @return the directory name
+ *
+ * @return the directory name.
*/
public Path getWorkingDirectory() {
String name = get("mapred.working.dir");
@@ -267,31 +369,107 @@
}
}
+ /**
+ * Get the {@link Path} to the output directory for the map-reduce job.
+ *
+ * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
+ *
+ * <p>Some applications need to create/write-to side-files, which differ from
+ * the actual job-outputs.
+ *
+ * <p>In such cases there could be issues with 2 instances of the same TIP
+ * (running simultaneously e.g. speculative tasks) trying to open/write-to the
+ * same file (path) on HDFS. Hence the application-writer will have to pick
+ * unique names per task-attempt (e.g. using the taskid, say
+ * <tt>task_200709221812_0001_m_000000_0</tt>), not just per TIP.</p>
+ *
+ * <p>To get around this the Map-Reduce framework helps the application-writer
+ * out by maintaining a special <tt>${mapred.output.dir}/_${taskid}</tt>
+ * sub-directory for each task-attempt on HDFS where the output of the
+ * task-attempt goes. On successful completion of the task-attempt the files
+ * in the <tt>${mapred.output.dir}/_${taskid}</tt> (only)
+ * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the
+ * framework discards the sub-directory of unsuccessful task-attempts. This
+ * is completely transparent to the application.</p>
+ *
+ * <p>The application-writer can take advantage of this by creating any
+ * side-files required in <tt>${mapred.output.dir}</tt> during execution of his
+ * reduce-task i.e. via {@link #getOutputPath()}, and the framework will move
+ * them out similarly - thus she doesn't have to pick unique paths per
+ * task-attempt.</p>
+ *
+ * <p><i>Note</i>: the value of <tt>${mapred.output.dir}</tt> during execution
+ * of a particular task-attempt is actually
+ * <tt>${mapred.output.dir}/_{$taskid}</tt>, not the value set by
+ * {@link #setOutputPath(Path)}. So, just create any side-files in the path
+ * returned by {@link #getOutputPath()} from map/reduce task to take
+ * advantage of this feature.</p>
+ *
+ * <p>The entire discussion holds true for maps of jobs with
+ * reducer=NONE (i.e. 0 reduces) since output of the map, in that case,
+ * goes directly to HDFS.</p>
+ *
+ * @return the {@link Path} to the output directory for the map-reduce job.
+ */
public Path getOutputPath() {
String name = get("mapred.output.dir");
return name == null ? null: new Path(name);
}
+ /**
+ * Set the {@link Path} of the output directory for the map-reduce job.
+ *
+ * <p><i>Note</i>:
+ * </p>
+ * @param dir the {@link Path} of the output directory for the map-reduce job.
+ */
public void setOutputPath(Path dir) {
dir = new Path(getWorkingDirectory(), dir);
set("mapred.output.dir", dir.toString());
}
+ /**
+ * Get the {@link InputFormat} implementation for the map-reduce job,
+ * defaults to {@link TextInputFormat} if not specified explicity.
+ *
+ * @return the {@link InputFormat} implementation for the map-reduce job.
+ */
public InputFormat getInputFormat() {
return (InputFormat)ReflectionUtils.newInstance(getClass("mapred.input.format.class",
TextInputFormat.class,
InputFormat.class),
this);
}
+
+ /**
+ * Set the {@link InputFormat} implementation for the map-reduce job.
+ *
+ * @param theClass the {@link InputFormat} implementation for the map-reduce
+ * job.
+ */
public void setInputFormat(Class<? extends InputFormat> theClass) {
setClass("mapred.input.format.class", theClass, InputFormat.class);
}
+
+ /**
+ * Get the {@link OutputFormat} implementation for the map-reduce job,
+ * defaults to {@link TextOutputFormat} if not specified explicity.
+ *
+ * @return the {@link OutputFormat} implementation for the map-reduce job.
+ */
public OutputFormat getOutputFormat() {
return (OutputFormat)ReflectionUtils.newInstance(getClass("mapred.output.format.class",
TextOutputFormat.class,
OutputFormat.class),
this);
}
+
+ /**
+ * Set the {@link OutputFormat} implementation for the map-reduce job.
+ *
+ * @param theClass the {@link OutputFormat} implementation for the map-reduce
+ * job.
+ */
public void setOutputFormat(Class<? extends OutputFormat> theClass) {
setClass("mapred.output.format.class", theClass, OutputFormat.class);
}
@@ -320,6 +498,7 @@
/**
* Should the map outputs be compressed before transfer?
* Uses the SequenceFile compression.
+ *
* @param compress should the map outputs be compressed?
*/
public void setCompressMapOutput(boolean compress) {
@@ -328,8 +507,9 @@
/**
* Are the outputs of the maps be compressed?
+ *
* @return <code>true</code> if the outputs of the maps are to be compressed,
- * <code>false</code> otherwise
+ * <code>false</code> otherwise.
*/
public boolean getCompressMapOutput() {
return getBoolean("mapred.compress.map.output", false);
@@ -337,8 +517,9 @@
/**
* Set the {@link CompressionType} for the map outputs.
+ *
* @param style the {@link CompressionType} to control how the map outputs
- * are compressed
+ * are compressed.
*/
public void setMapOutputCompressionType(CompressionType style) {
set("mapred.map.output.compression.type", style.toString());
@@ -346,8 +527,9 @@
/**
* Get the {@link CompressionType} for the map outputs.
+ *
* @return the {@link CompressionType} for map outputs, defaulting to
- * {@link CompressionType#RECORD}
+ * {@link CompressionType#RECORD}.
*/
public CompressionType getMapOutputCompressionType() {
String val = get("mapred.map.output.compression.type",
@@ -357,8 +539,9 @@
/**
* Set the given class as the {@link CompressionCodec} for the map outputs.
+ *
* @param codecClass the {@link CompressionCodec} class that will compress
- * the map outputs
+ * the map outputs.
*/
public void
setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
@@ -368,9 +551,10 @@
/**
* Get the {@link CompressionCodec} for compressing the map outputs.
+ *
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} class that should be used to compress the
- * map outputs
+ * map outputs.
* @throws IllegalArgumentException if the class was specified, but not found
*/
public Class<? extends CompressionCodec>
@@ -390,10 +574,10 @@
/**
* Get the key class for the map output data. If it is not set, use the
- * (final) output ket class This allows the map output key class to be
- * different than the final output key class
- *
- * @return map output key class
+ * (final) output key class. This allows the map output key class to be
+ * different than the final output key class.
+ *
+ * @return the map output key class.
*/
public Class<? extends WritableComparable> getMapOutputKeyClass() {
Class<? extends WritableComparable> retv = getClass("mapred.mapoutput.key.class", null,
@@ -407,7 +591,9 @@
/**
* Set the key class for the map output data. This allows the user to
* specify the map output key class to be different than the final output
- * value class
+ * value class.
+ *
+ * @param theClass the map output key class.
*/
public void setMapOutputKeyClass(Class<? extends WritableComparable> theClass) {
setClass("mapred.mapoutput.key.class", theClass,
@@ -417,9 +603,9 @@
/**
* Get the value class for the map output data. If it is not set, use the
* (final) output value class This allows the map output value class to be
- * different than the final output value class
- *
- * @return map output value class
+ * different than the final output value class.
+ *
+ * @return the map output value class.
*/
public Class<? extends Writable> getMapOutputValueClass() {
Class<? extends Writable> retv = getClass("mapred.mapoutput.value.class", null,
@@ -433,21 +619,38 @@
/**
* Set the value class for the map output data. This allows the user to
* specify the map output value class to be different than the final output
- * value class
+ * value class.
+ *
+ * @param theClass the map output value class.
*/
public void setMapOutputValueClass(Class<? extends Writable> theClass) {
setClass("mapred.mapoutput.value.class", theClass, Writable.class);
}
+ /**
+ * Get the key class for the job output data.
+ *
+ * @return the key class for the job output data.
+ */
public Class<? extends WritableComparable> getOutputKeyClass() {
return getClass("mapred.output.key.class",
LongWritable.class, WritableComparable.class);
}
+ /**
+ * Set the key class for the job output data.
+ *
+ * @param theClass the key class for the job output data.
+ */
public void setOutputKeyClass(Class<? extends WritableComparable> theClass) {
setClass("mapred.output.key.class", theClass, WritableComparable.class);
}
+ /**
+ * Get the {@link WritableComparable} comparator used to compare keys.
+ *
+ * @return the {@link WritableComparable} comparator used to compare keys.
+ */
public WritableComparator getOutputKeyComparator() {
Class theClass = getClass("mapred.output.key.comparator.class", null,
WritableComparator.class);
@@ -456,17 +659,24 @@
return WritableComparator.get(getMapOutputKeyClass());
}
+ /**
+ * Set the {@link WritableComparable} comparator used to compare keys.
+ *
+ * @param theClass the {@link WritableComparable} comparator used to
+ * compare keys.
+ * @see #setOutputValueGroupingComparator(Class)
+ */
public void setOutputKeyComparatorClass(Class<? extends WritableComparator> theClass) {
setClass("mapred.output.key.comparator.class",
theClass, WritableComparator.class);
}
- /** Get the user defined comparator for grouping values.
+ /**
+ * Get the user defined {@link WritableComparable} comparator for
+ * grouping keys of inputs to the reduce.
*
- * This call is used to get the comparator for grouping values by key.
- * @see #setOutputValueGroupingComparator(Class) for details.
- *
- * @return Comparator set by the user for grouping values.
+ * @return comparator set by the user for grouping values.
+ * @see #setOutputValueGroupingComparator(Class) for details.
*/
public WritableComparator getOutputValueGroupingComparator() {
Class theClass = getClass("mapred.output.value.groupfn.class", null,
@@ -478,120 +688,310 @@
return (WritableComparator)ReflectionUtils.newInstance(theClass, this);
}
- /** Set the user defined comparator for grouping values.
+ /**
+ * Set the user defined {@link WritableComparable} comparator for
+ * grouping keys in the input to the reduce.
*
- * For key-value pairs (K1,V1) and (K2,V2), the values are passed
- * in a single call to the map function if K1 and K2 compare as equal.
+ * <p>This comparator should be provided if the equivalence rules for keys
+ * for sorting the intermediates are different from those for grouping keys
+ * before each call to
+ * {@link Reducer#reduce(WritableComparable, java.util.Iterator, OutputCollector, Reporter)}.</p>
+ *
+ * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
+ * in a single call to the reduce function if K1 and K2 compare as equal.</p>
*
- * This comparator should be provided if the equivalence rules for keys
- * for sorting the intermediates are different from those for grouping
- * values.
+ * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control
+ * how keys are sorted, this can be used in conjunction to simulate
+ * <i>secondary sort on values</i>.</p>
+ *
+ * <p><i>Note</i>: This is not a guarantee of the reduce sort being
+ * <i>stable</i> in any sense. (In any case, with the order of available
+ * map-outputs to the reduce being non-deterministic, it wouldn't make
+ * that much sense.)</p>
*
- * @param theClass The Comparator class to be used for grouping. It should
- * extend WritableComparator.
+ * @param theClass the comparator class to be used for grouping keys.
+ * It should extend <code>WritableComparator</code>.
+ * @see #setOutputKeyComparatorClass(Class)
*/
public void setOutputValueGroupingComparator(Class theClass) {
setClass("mapred.output.value.groupfn.class",
theClass, WritableComparator.class);
}
+ /**
+ * Get the value class for job outputs.
+ *
+ * @return the value class for job outputs.
+ */
public Class<? extends Writable> getOutputValueClass() {
return getClass("mapred.output.value.class", Text.class, Writable.class);
}
+
+ /**
+ * Set the value class for job outputs.
+ *
+ * @param theClass the value class for job outputs.
+ */
public void setOutputValueClass(Class<? extends Writable> theClass) {
setClass("mapred.output.value.class", theClass, Writable.class);
}
-
+ /**
+ * Get the {@link Mapper} class for the job.
+ *
+ * @return the {@link Mapper} class for the job.
+ */
public Class<? extends Mapper> getMapperClass() {
return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
}
+
+ /**
+ * Set the {@link Mapper} class for the job.
+ *
+ * @param theClass the {@link Mapper} class for the job.
+ */
public void setMapperClass(Class<? extends Mapper> theClass) {
setClass("mapred.mapper.class", theClass, Mapper.class);
}
+ /**
+ * Get the {@link MapRunnable} class for the job.
+ *
+ * @return the {@link MapRunnable} class for the job.
+ */
public Class<? extends MapRunnable> getMapRunnerClass() {
return getClass("mapred.map.runner.class",
MapRunner.class, MapRunnable.class);
}
+
+ /**
+ * Expert: Set the {@link MapRunnable} class for the job.
+ *
+ * Typically used to exert greater control on {@link Mapper}s.
+ *
+ * @param theClass the {@link MapRunnable} class for the job.
+ */
public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
setClass("mapred.map.runner.class", theClass, MapRunnable.class);
}
+ /**
+ * Get the {@link Partitioner} used to partition {@link Mapper}-outputs
+ * to be sent to the {@link Reducer}s.
+ *
+ * @return the {@link Partitioner} used to partition map-outputs.
+ */
public Class<? extends Partitioner> getPartitionerClass() {
return getClass("mapred.partitioner.class",
HashPartitioner.class, Partitioner.class);
}
+
+ /**
+ * Set the {@link Partitioner} class used to partition
+ * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
+ *
+ * @param theClass the {@link Partitioner} used to partition map-outputs.
+ */
public void setPartitionerClass(Class<? extends Partitioner> theClass) {
setClass("mapred.partitioner.class", theClass, Partitioner.class);
}
+ /**
+ * Get the {@link Reducer} class for the job.
+ *
+ * @return the {@link Reducer} class for the job.
+ */
public Class<? extends Reducer> getReducerClass() {
return getClass("mapred.reducer.class",
IdentityReducer.class, Reducer.class);
}
+
+ /**
+ * Set the {@link Reducer} class for the job.
+ *
+ * @param theClass the {@link Reducer} class for the job.
+ */
public void setReducerClass(Class<? extends Reducer> theClass) {
setClass("mapred.reducer.class", theClass, Reducer.class);
}
+ /**
+ * Get the user-defined <i>combiner</i> class used to combine map-outputs
+ * before being sent to the reducers. Typically the combiner is same as the
+ * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
+ *
+ * @return the user-defined combiner class used to combine map-outputs.
+ */
public Class<? extends Reducer> getCombinerClass() {
return getClass("mapred.combiner.class", null, Reducer.class);
}
+
+ /**
+ * Set the user-defined <i>combiner</i> class used to combine map-outputs
+ * before being sent to the reducers.
+ *
+ * <p>The combiner is a task-level aggregation operation which, in some cases,
+ * helps to cut down the amount of data transferred from the {@link Mapper} to
+ * the {@link Reducer}, leading to better performance.</p>
+ *
+ * <p>Typically the combiner is same as the the <code>Reducer</code> for the
+ * job i.e. {@link #setReducerClass(Class)}.</p>
+ *
+ * @param theClass the user-defined combiner class used to combine
+ * map-outputs.
+ */
public void setCombinerClass(Class<? extends Reducer> theClass) {
setClass("mapred.combiner.class", theClass, Reducer.class);
}
/**
- * Should speculative execution be used for this job?
- * @return Defaults to true
+ * Should speculative execution be used for this job?
+ * Defaults to <code>true</code>.
+ *
+ * @return <code>true</code> if speculative execution be used for this job,
+ * <code>false</code> otherwise.
*/
public boolean getSpeculativeExecution() {
return getBoolean("mapred.speculative.execution", true);
}
/**
- * Turn on or off speculative execution for this job.
- * In general, it should be turned off for map jobs that have side effects.
+ * Turn speculative execution on or off for this job.
+ *
+ * @param speculativeExecution <code>true</code> if speculative execution
+ * should be turned on, else <code>false</code>.
*/
- public void setSpeculativeExecution(boolean new_val) {
- setBoolean("mapred.speculative.execution", new_val);
+ public void setSpeculativeExecution(boolean speculativeExecution) {
+ setBoolean("mapred.speculative.execution", speculativeExecution);
}
+ /**
+ * Get configured the number of reduce tasks for this job.
+ * Defaults to <code>1</code>.
+ *
+ * @return the number of reduce tasks for this job.
+ */
public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
+
+ /**
+ * Set the number of map tasks for this job.
+ *
+ * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual
+ * number of spawned map tasks depends on the number of {@link InputSplit}s
+ * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
+ *
+ * A custom {@link InputFormat} is typically used to accurately control
+ * the number of map tasks for the job.</p>
+ *
+ * <h4 id="NoOfMaps">How many maps?</h4>
+ *
+ * <p>The number of maps is usually driven by the total size of the inputs
+ * i.e. total number of blocks of the input files.</p>
+ *
+ * <p>The right level of parallelism for maps seems to be around 10-100 maps
+ * per-node, although it has been set up to 300 or so for very cpu-light map
+ * tasks. Task setup takes awhile, so it is best if the maps take at least a
+ * minute to execute.</p>
+ *
+ * <p>The default behavior of file-based {@link InputFormat}s is to split the
+ * input into <i>logical</i> {@link InputSplit}s based on the total size, in
+ * bytes, of input files. However, the {@link FileSystem} blocksize of the
+ * input files is treated as an upper bound for input splits. A lower bound
+ * on the split size can be set via
+ * <a href="{@docRoot}/../hadoop-default.html#mapred.min.split.size">
+ * mapred.min.split.size</a>.</p>
+ *
+ * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB,
+ * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is
+ * used to set it even higher.</p>
+ *
+ * @param n the number of map tasks for this job.
+ * @see InputFormat#getSplits(JobConf, int)
+ * @see FileInputFormat
+ * @see FileSystem#getDefaultBlockSize()
+ * @see FileStatus#getBlockSize()
+ */
public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
+ /**
+ * Get configured the number of reduce tasks for this job. Defaults to
+ * <code>1</code>.
+ *
+ * @return the number of reduce tasks for this job.
+ */
public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
+
+ /**
+ * Set the requisite number of reduce tasks for this job.
+ *
+ * <h4 id="NoOfReduces">How many reduces?</h4>
+ *
+ * <p>The right number of reduces seems to be <code>0.95</code> or
+ * <code>1.75</code> multiplied by (<<i>no. of nodes</i>> *
+ * <a href="{@docRoot}/../hadoop-default.html#mapred.tasktracker.tasks.maximum">
+ * mapred.tasktracker.tasks.maximum</a>).
+ * </p>
+ *
+ * <p>With <code>0.95</code> all of the reduces can launch immediately and
+ * start transfering map outputs as the maps finish. With <code>1.75</code>
+ * the faster nodes will finish their first round of reduces and launch a
+ * second wave of reduces doing a much better job of load balancing.</p>
+ *
+ * <p>Increasing the number of reduces increases the framework overhead, but
+ * increases load balancing and lowers the cost of failures.</p>
+ *
+ * <p>The scaling factors above are slightly less than whole numbers to
+ * reserve a few reduce slots in the framework for speculative-tasks, failures
+ * etc.</p>
+ *
+ * <h4 id="ReducerNone">Reducer NONE</h4>
+ *
+ * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
+ *
+ * <p>In this case the output of the map-tasks directly go to distributed
+ * file-system, to the path set by {@link #setOutputPath(Path)}. Also, the
+ * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
+ *
+ * @param n the number of reduce tasks for this job.
+ */
public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
- /** Get the configured number of maximum attempts that will be made to run a
- * map task, as specified by the <code>mapred.map.max.attempts</code>
- * property. If this property is not already set, the default is 4 attempts
- * @return the max number of attempts
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * map task, as specified by the <code>mapred.map.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per map task.
*/
public int getMaxMapAttempts() {
return getInt("mapred.map.max.attempts", 4);
}
- /** Expert: Set the number of maximum attempts that will be made to run a
- * map task
- * @param n the number of attempts
- *
+
+ /**
+ * Expert: Set the number of maximum attempts that will be made to run a
+ * map task.
+ *
+ * @param n the number of attempts per map task.
*/
public void setMaxMapAttempts(int n) {
setInt("mapred.map.max.attempts", n);
}
- /** Get the configured number of maximum attempts that will be made to run a
- * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
- * property. If this property is not already set, the default is 4 attempts
- * @return the max number of attempts
+ /**
+ * Get the configured number of maximum attempts that will be made to run a
+ * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
+ * property. If this property is not already set, the default is 4 attempts.
+ *
+ * @return the max number of attempts per reduce task.
*/
public int getMaxReduceAttempts() {
return getInt("mapred.reduce.max.attempts", 4);
}
- /** Expert: Set the number of maximum attempts that will be made to run a
- * reduce task
- * @param n the number of attempts
- *
+ /**
+ * Expert: Set the number of maximum attempts that will be made to run a
+ * reduce task.
+ *
+ * @param n the number of attempts per reduce task.
*/
public void setMaxReduceAttempts(int n) {
setInt("mapred.reduce.max.attempts", n);
@@ -600,7 +1000,8 @@
/**
* Get the user-specified job name. This is only used to identify the
* job to the user.
- * @return the job's name, defaulting to ""
+ *
+ * @return the job's name, defaulting to "".
*/
public String getJobName() {
return get("mapred.job.name", "");
@@ -608,7 +1009,8 @@
/**
* Set the user-specified job name.
- * @param name the job's new name
+ *
+ * @param name the job's new name.
*/
public void setJobName(String name) {
set("mapred.job.name", name);
@@ -627,16 +1029,16 @@
* When not running under HOD, this identifer is expected to remain set to
* the empty string.
*
- * @return the session identifier, defaulting to ""
+ * @return the session identifier, defaulting to "".
*/
public String getSessionId() {
return get("session.id", "");
}
/**
- * Set the user-specified session idengifier.
+ * Set the user-specified session identifier.
*
- * @param sessionId the new session id
+ * @param sessionId the new session id.
*/
public void setSessionId(String sessionId) {
set("session.id", sessionId);
@@ -644,6 +1046,8 @@
/**
* Set the maximum no. of failures of a given job per tasktracker.
+ * If the no. of task failures exceeds <code>noFailures</code>, the
+ * tasktracker is <i>blacklisted</i> for this job.
*
* @param noFailures maximum no. of failures of a given job per tasktracker.
*/
@@ -652,7 +1056,9 @@
}
/**
- * Get the maximum no. of failures of a given job per tasktracker.
+ * Expert: Get the maximum no. of failures of a given job per tasktracker.
+ * If the no. of task failures exceeds this, the tasktracker is
+ * <i>blacklisted</i> for this job.
*
* @return the maximum no. of failures of a given job per tasktracker.
*/
@@ -662,21 +1068,30 @@
/**
* Get the maximum percentage of map tasks that can fail without
- * the job being aborted.
+ * the job being aborted.
+ *
+ * Each map task is executed a minimum of {@link #getMaxMapAttempts()}
+ * attempts before being declared as <i>failed</i>.
+ *
+ * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
+ * the job being declared as {@link JobStatus#FAILED}.
*
* @return the maximum percentage of map tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public int getMaxMapTaskFailuresPercent() {
return getInt("mapred.max.map.failures.percent", 0);
}
/**
- * Set the maximum percentage of map tasks that can fail without the job
- * being aborted.
+ * Expert: Set the maximum percentage of map tasks that can fail without the
+ * job being aborted.
+ *
+ * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts
+ * before being declared as <i>failed</i>.
*
* @param percent the maximum percentage of map tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public void setMaxMapTaskFailuresPercent(int percent) {
setInt("mapred.max.map.failures.percent", percent);
@@ -684,10 +1099,16 @@
/**
* Get the maximum percentage of reduce tasks that can fail without
- * the job being aborted.
+ * the job being aborted.
+ *
+ * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
+ * attempts before being declared as <i>failed</i>.
+ *
+ * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results
+ * in the job being declared as {@link JobStatus#FAILED}.
*
* @return the maximum percentage of reduce tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public int getMaxReduceTaskFailuresPercent() {
return getInt("mapred.max.reduce.failures.percent", 0);
@@ -697,24 +1118,29 @@
* Set the maximum percentage of reduce tasks that can fail without the job
* being aborted.
*
+ * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()}
+ * attempts before being declared as <i>failed</i>.
+ *
* @param percent the maximum percentage of reduce tasks that can fail without
- * the job being aborted
+ * the job being aborted.
*/
public void setMaxReduceTaskFailuresPercent(int percent) {
setInt("mapred.max.reduce.failures.percent", percent);
}
/**
- * Set job priority for this job.
+ * Set {@link JobPriority} for this job.
*
- * @param prio
+ * @param prio the {@link JobPriority} for this job.
*/
public void setJobPriority(JobPriority prio) {
set("mapred.job.priority", prio.toString());
}
/**
- * Get the job priority for this job.
+ * Get the {@link JobPriority} for this job.
+ *
+ * @return the {@link JobPriority} for this job.
*/
public JobPriority getJobPriority() {
String prio = get("mapred.job.priority");
@@ -725,12 +1151,44 @@
return JobPriority.valueOf(prio);
}
+ /**
+ * Get the uri to be invoked in-order to send a notification after the job
+ * has completed (success/failure).
+ *
+ * @return the job end notification uri, <code>null</code> if it hasn't
+ * been set.
+ * @see #setJobEndNotificationURI(String)
+ */
+ public String getJobEndNotificationURI() {
+ return get("job.end.notification.url");
+ }
+
+ /**
+ * Set the uri to be invoked in-order to send a notification after the job
+ * has completed (success/failure).
+ *
+ * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and
+ * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's
+ * identifier and completion-status respectively.</p>
+ *
+ * <p>This is typically used by application-writers to implement chaining of
+ * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
+ *
+ * @param uri the job end notification uri
+ * @see JobStatus
+ * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#JobCompletionAndChaining">Job Completion and Chaining</a>
+ */
+ public void setJobEndNotificationURI(String uri) {
+ set("job.end.notification.url", uri);
+ }
- /** Find a jar that contains a class of the same name, if any.
+ /**
+ * Find a jar that contains a class of the same name, if any.
* It will return a jar file, even if that is not the first thing
* on the class path that has a class with the same name.
- * @param my_class the class to find
- * @return a jar file that contains the class, or null
+ *
+ * @param my_class the class to find.
+ * @return a jar file that contains the class, or null.
* @throws IOException
*/
private static String findContainingJar(Class my_class) {
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Oct 24 14:24:51 2007
@@ -95,7 +95,7 @@
private static JobEndStatusInfo createNotification(JobConf conf,
JobStatus status) {
JobEndStatusInfo notification = null;
- String uri = conf.get("job.end.notification.url");
+ String uri = conf.getJobEndNotificationURI();
if (uri != null) {
// +1 to make logic for first notification identical to a retry
int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapReduceBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapReduceBase.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapReduceBase.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapReduceBase.java Wed Oct 24 14:24:51 2007
@@ -23,8 +23,11 @@
import org.apache.hadoop.io.Closeable;
import org.apache.hadoop.mapred.JobConfigurable;
-/** Base class for {@link Mapper} and {@link Reducer} implementations.
- * Provides default implementations for a few methods.
+/**
+ * Base class for {@link Mapper} and {@link Reducer} implementations.
+ *
+ * <p>Provides default no-op implementations for a few methods, most non-trivial
+ * applications need to override some of them.</p>
*/
public class MapReduceBase implements Closeable, JobConfigurable {
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapRunnable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapRunnable.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapRunnable.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/MapRunnable.java Wed Oct 24 14:24:51 2007
@@ -23,15 +23,28 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-/** Expert: Permits greater control of map processing. For example,
- * implementations might perform multi-threaded, asynchronous mappings. */
+/**
+ * Expert: Generic interface for {@link Mapper}s.
+ *
+ * <p>Custom implementations of <code>MapRunnable</code> can exert greater
+ * control on map processing e.g. multi-threaded, asynchronous mappers etc.</p>
+ *
+ * @see Mapper
+ */
public interface MapRunnable<K1 extends WritableComparable, V1 extends Writable,
K2 extends WritableComparable, V2 extends Writable>
extends JobConfigurable {
- /** Called to execute mapping. Mapping is complete when this returns.
- * @param input the {@link RecordReader} with input key/value pairs.
- * @param output the {@link OutputCollector} for mapped key/value pairs.
+ /**
+ * Start mapping input <tt><key, value></tt> pairs.
+ *
+ * <p>Mapping of input records to output records is complete when this method
+ * returns.</p>
+ *
+ * @param input the {@link RecordReader} to read the input records.
+ * @param output the {@link OutputCollector} to collect the outputrecords.
+ * @param reporter {@link Reporter} to report progress, status-updates etc.
+ * @throws IOException
*/
void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
Reporter reporter)
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Mapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Mapper.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Mapper.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Mapper.java Wed Oct 24 14:24:51 2007
@@ -20,29 +20,141 @@
import java.io.IOException;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.compress.CompressionCodec;
-/** Maps input key/value pairs to a set of intermediate key/value pairs. All
- * intermediate values associated with a given output key are subsequently
- * grouped by the map/reduce system, and passed to a {@link Reducer} to
- * determine the final output.. */
+/**
+ * Maps input key/value pairs to a set of intermediate key/value pairs.
+ *
+ * <p>Maps are the individual tasks which transform input records into a
+ * intermediate records. The transformed intermediate records need not be of
+ * the same type as the input records. A given input pair may map to zero or
+ * many output pairs.</p>
+ *
+ * <p>The Hadoop Map-Reduce framework spawns one map task for each
+ * {@link InputSplit} generated by the {@link InputFormat} for the job.
+ * <code>Mapper</code> implementations can access the {@link JobConf} for the
+ * job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ * themselves. Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.</p>
+ *
+ * <p>The framework then calls
+ * {@link #map(WritableComparable, Writable, OutputCollector, Reporter)}
+ * for each key/value pair in the <code>InputSplit</code> for that task.</p>
+ *
+ * <p>All intermediate values associated with a given output key are
+ * subsequently grouped by the framework, and passed to a {@link Reducer} to
+ * determine the final output. Users can control the grouping by specifying
+ * a <code>Comparator</code> via
+ * {@link JobConf#setOutputKeyComparatorClass(Class)}.</p>
+ *
+ * <p>The grouped <code>Mapper</code> outputs are partitioned per
+ * <code>Reducer</code>. Users can control which keys (and hence records) go to
+ * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
+ *
+ * <p>Users can optionally specify a <code>combiner</code>, via
+ * {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the
+ * intermediate outputs, which helps to cut down the amount of data transferred
+ * from the <code>Mapper</code> to the <code>Reducer</code>.
+ *
+ * <p>The intermediate, grouped outputs are always stored in
+ * {@link SequenceFile}s. Applications can specify if and how the intermediate
+ * outputs are to be compressed and which {@link CompressionCodec}s are to be
+ * used via the <code>JobConf</code>.</p>
+ *
+ * <p>If the job has
+ * <a href="{@docRoot}/org/apache/hadoop/mapred/JobConf.html#ReducerNone">zero
+ * reduces</a> then the output of the <code>Mapper</code> is directly written
+ * to the {@link FileSystem} without grouping by keys.</p>
+ *
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ * public class MyMapper<K extends WritableComparable, V extends Writable>
+ * extends MapReduceBase implements Mapper<K, V, K, V> {
+ *
+ * static enum MyCounters { NUM_RECORDS }
+ *
+ * private String mapTaskId;
+ * private String inputFile;
+ * private int noRecords = 0;
+ *
+ * public void configure(JobConf job) {
+ * mapTaskId = job.get("mapred.task.id");
+ * inputFile = job.get("mapred.input.file");
+ * }
+ *
+ * public void map(K key, V val,
+ * OutputCollector<K, V> output, Reporter reporter)
+ * throws IOException {
+ * // Process the <key, value> pair (assume this takes a while)
+ * // ...
+ * // ...
+ *
+ * // Let the framework know that we are alive, and kicking!
+ * // reporter.progress();
+ *
+ * // Process some more
+ * // ...
+ * // ...
+ *
+ * // Increment the no. of <key, value> pairs processed
+ * ++noRecords;
+ *
+ * // Increment counters
+ * reporter.incrCounter(NUM_RECORDS, 1);
+ *
+ * // Every 100 records update application-level status
+ * if ((noRecords%100) == 0) {
+ * reporter.setStatus(mapTaskId + " processed " + noRecords +
+ * " from input-file: " + inputFile);
+ * }
+ *
+ * // Output the result
+ * output.collect(key, val);
+ * }
+ * }
+ *
+ * <p>Applications may write a custom {@link MapRunnable} to exert greater
+ * control on map processing e.g. multi-threaded <code>Mapper</code>s etc.</p>
+ *
+ * @see JobConf
+ * @see InputFormat
+ * @see Partitioner
+ * @see Reducer
+ * @see MapReduceBase
+ * @see MapRunnable
+ * @see SequenceFile
+ */
public interface Mapper<K1 extends WritableComparable, V1 extends Writable,
K2 extends WritableComparable, V2 extends Writable>
extends JobConfigurable, Closeable {
- /** Maps a single input key/value pair into intermediate key/value pairs.
- * Output pairs need not be of the same types as input pairs. A given input
- * pair may map to zero or many output pairs. Output pairs are collected
- * with calls to {@link
- * OutputCollector#collect(WritableComparable,Writable)}.
+ /**
+ * Maps a single input key/value pair into an intermediate key/value pair.
+ *
+ * <p>Output pairs need not be of the same types as input pairs. A given
+ * input pair may map to zero or many output pairs. Output pairs are
+ * collected with calls to
+ * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
*
- * @param key the key
- * @param value the values
- * @param output collects mapped keys and values
+ * <p>Applications can use the {@link Reporter} provided to report progress
+ * or just indicate that they are alive. In scenarios where the application
+ * takes an insignificant amount of time to process individual key/value
+ * pairs, this is crucial since the framework might assume that the task has
+ * timed-out and kill that task. The other way of avoiding this is to set
+ * <a href="{@docRoot}/../hadoop-default.html#mapred.task.timeout">
+ * mapred.task.timeout</a> to a high-enough value (or even zero for no
+ * time-outs).</p>
+ *
+ * @param key the input key.
+ * @param value the input value.
+ * @param output collects mapped keys and values.
+ * @param reporter facility to report progress.
*/
- void map(K1 key, V1 value,
- OutputCollector<K2, V2> output, Reporter reporter)
- throws IOException;
+ void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
+ throws IOException;
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputCollector.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputCollector.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputCollector.java Wed Oct 24 14:24:51 2007
@@ -24,15 +24,23 @@
import org.apache.hadoop.io.WritableComparable;
-/** Passed to {@link Mapper} and {@link Reducer} implementations to collect
- * output data. */
+/**
+ * Collects the <code><key, value></code> pairs output by {@link Mapper}s
+ * and {@link Reducer}s.
+ *
+ * <p><code>OutputCollector</code> is the generalization of the facility
+ * provided by the Map-Reduce framework to collect data output by either the
+ * <code>Mapper</code> or the <code>Reducer</code> i.e. intermediate outputs
+ * or the output of the job.</p>
+ */
public interface OutputCollector<K extends WritableComparable,
V extends Writable> {
/** Adds a key/value pair to the output.
*
- * @param key the key to add
- * @param value to value to add
+ * @param key the key to collect.
+ * @param value to value to collect.
+ * @throws IOException
*/
void collect(K key, V value) throws IOException;
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputFormat.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/OutputFormat.java Wed Oct 24 14:24:51 2007
@@ -25,28 +25,53 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.Progressable;
-/** An output data format. Output files are stored in a {@link
- * FileSystem}. */
+/**
+ * <code>OutputFormat</code> describes the output-specification for a
+ * Map-Reduce job.
+ *
+ * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
+ * job to:<p>
+ * <ol>
+ * <li>
+ * Validate the output-specification of the job. For e.g. check that the
+ * output directory doesn't already exist.
+ * <li>
+ * Provide the {@link RecordWriter} implementation to be used to write out
+ * the output files of the job. Output files are stored in a
+ * {@link FileSystem}.
+ * </li>
+ * </ol>
+ *
+ * @see RecordWriter
+ * @see JobConf
+ */
public interface OutputFormat<K extends WritableComparable,
V extends Writable> {
- /** Construct a {@link RecordWriter} with Progressable.
+ /**
+ * Get the {@link RecordWriter} for the given job.
*
- * @param job the job whose output is being written
- * @param name the unique name for this part of the output
- * @param progress mechanism for reporting progress while writing to file
- * @return a {@link RecordWriter}
+ * @param ignored
+ * @param job configuration for the job whose output is being written.
+ * @param name the unique name for this part of the output.
+ * @param progress mechanism for reporting progress while writing to file.
+ * @return a {@link RecordWriter} to write the output for the job.
+ * @throws IOException
*/
RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
- throws IOException;
+ throws IOException;
- /** Check whether the output specification for a job is appropriate. Called
- * when a job is submitted. Typically checks that it does not already exist,
+ /**
+ * Check for validity of the output-specification for the job.
+ *
+ * <p>This is to validate the output specification for the job when it is
+ * a job is submitted. Typically checks that it does not already exist,
* throwing an exception when it already exists, so that output is not
- * overwritten.
+ * overwritten.</p>
*
- * @param job the job whose output will be written
+ * @param ignored
+ * @param job job configuration.
* @throws IOException when output should not be attempted
*/
void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Partitioner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Partitioner.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Partitioner.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Partitioner.java Wed Oct 24 14:24:51 2007
@@ -21,18 +21,32 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-/** Partitions the key space. A partition is created for each reduce task. */
+/**
+ * Partitions the key space.
+ *
+ * <p><code>Partitioner</code> controls the partitioning of the keys of the
+ * intermediate map-outputs. The key (or a subset of the key) is used to derive
+ * the partition, typically by a hash function. The total number of partitions
+ * is the same as the number of reduce tasks for the job. Hence this controls
+ * which of the <code>m</code> reduce tasks the intermediate key (and hence the
+ * record) is sent for reduction.</p>
+ *
+ * @see Reducer
+ */
public interface Partitioner<K2 extends WritableComparable,
V2 extends Writable>
extends JobConfigurable {
- /** Returns the paritition number for a given entry given the total number of
- * partitions. Typically a hash function on a all or a subset of the key.
+ /**
+ * Get the paritition number for a given key (hence record) given the total
+ * number of partitions i.e. number of reduce-tasks for the job.
+ *
+ * <p>Typically a hash function on a all or a subset of the key.</p>
*
- * @param key the entry key
- * @param value the entry value
- * @param numPartitions the number of partitions
- * @return the partition number
+ * @param key the key to be paritioned.
+ * @param value the entry value.
+ * @param numPartitions the total number of partitions.
+ * @return the partition number for the <code>key</code>.
*/
int getPartition(K2 key, V2 value, int numPartitions);
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordReader.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordReader.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordReader.java Wed Oct 24 14:24:51 2007
@@ -24,11 +24,23 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-/** Reads key/value pairs from an input file {@link FileSplit}.
- * Implemented by {@link InputFormat} implementations. */
+/**
+ * <code>RecordReader</code> reads <key, value> pairs from an
+ * {@link InputSplit}.
+ *
+ * <p><code>RecordReader</code>, typically, converts the byte-oriented view of
+ * the input, provided by the <code>InputSplit</code>, and presents a
+ * record-oriented view for the {@link Mapper} & {@link Reducer} tasks for
+ * processing. It thus assumes the responsibility of processing record
+ * boundaries and presenting the tasks with keys and values.</p>
+ *
+ * @see InputSplit
+ * @see InputFormat
+ */
public interface RecordReader<K extends WritableComparable,
V extends Writable> {
- /** Reads the next key/value pair.
+ /**
+ * Reads the next key/value pair from the input for processing.
*
* @param key the key to read data into
* @param value the value to read data into
@@ -40,25 +52,38 @@
/**
* Create an object of the appropriate type to be used as a key.
- * @return a new key object
+ *
+ * @return a new key object.
*/
K createKey();
/**
- * Create an object of the appropriate type to be used as the value.
- * @return a new value object
+ * Create an object of the appropriate type to be used as a value.
+ *
+ * @return a new value object.
*/
V createValue();
- /** Returns the current position in the input. */
+ /**
+ * Returns the current position in the input.
+ *
+ * @return the current position in the input.
+ * @throws IOException
+ */
long getPos() throws IOException;
- /** Close this to future operations.*/
+ /**
+ * Close this {@link InputSplit} to future operations.
+ *
+ * @throws IOException
+ */
public void close() throws IOException;
/**
- * How far has the reader gone through the input.
- * @return progress from 0.0 to 1.0
+ * How much of the input has the {@link RecordReader} consumed i.e.
+ * has been processed by?
+ *
+ * @return progress from <code>0.0</code> to <code>1.0</code>.
* @throws IOException
*/
float getProgress() throws IOException;
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordWriter.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordWriter.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RecordWriter.java Wed Oct 24 14:24:51 2007
@@ -21,22 +21,36 @@
import java.io.IOException;
import java.io.DataOutput;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
-/** Writes key/value pairs to an output file. Implemented by {@link
- * OutputFormat} implementations. */
+/**
+ * <code>RecordWriter</code> writes the output <key, value> pairs
+ * to an output file.
+
+ * <p><code>RecordWriter</code> implementations write the job outputs to the
+ * {@link FileSystem}.
+ *
+ * @see OutputFormat
+ */
public interface RecordWriter<K extends WritableComparable,
V extends Writable> {
- /** Writes a key/value pair.
- *
- * @param key the key to write
- * @param value the value to write
+ /**
+ * Writes a key/value pair.
*
+ * @param key the key to write.
+ * @param value the value to write.
+ * @throws IOException
* @see Writable#write(DataOutput)
*/
void write(K key, V value) throws IOException;
- /** Close this to future operations.*/
+ /**
+ * Close this <code>RecordWriter</code> to future operations.
+ *
+ * @param reporter facility to report progress.
+ * @throws IOException
+ */
void close(Reporter reporter) throws IOException;
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reducer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reducer.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reducer.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reducer.java Wed Oct 24 14:24:51 2007
@@ -22,24 +22,176 @@
import java.util.Iterator;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Closeable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-/** Reduces a set of intermediate values which share a key to a smaller set of
- * values. Input values are the grouped output of a {@link Mapper}. */
+/**
+ * Reduces a set of intermediate values which share a key to a smaller set of
+ * values.
+ *
+ * <p>The number of <code>Reducer</code>s for the job is set by the user via
+ * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations
+ * can access the {@link JobConf} for the job via the
+ * {@link JobConfigurable#configure(JobConf)} method and initialize themselves.
+ * Similarly they can use the {@link Closeable#close()} method for
+ * de-initialization.</p>
+
+ * <p><code>Reducer</code> has 3 primary phases:</p>
+ * <ol>
+ * <li>
+ *
+ * <h4 id="Shuffle">Shuffle</h4>
+ *
+ * <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
+ * In the phase the framework, for each <code>Reducer</code>, fetches the
+ * relevant partition of the output of all the <code>Mapper</code>s, via HTTP.
+ * </p>
+ * </li>
+ *
+ * <li>
+ * <h4 id="Sort">Sort</h4>
+ *
+ * <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s
+ * (since different <code>Mapper</code>s may have output the same key) in this
+ * stage.</p>
+ *
+ * <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
+ * being fetched they are merged.</p>
+ *
+ * <h5 id="SecondarySort">SecondarySort</h5>
+ *
+ * <p>If equivalence rules for keys while grouping the intermediates are
+ * different from those for grouping keys before reduction, then one may
+ * specify a <code>Comparator</code> via
+ * {@link JobConf#setOutputValueGroupingComparator(Class)}.Since
+ * {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to
+ * control how intermediate keys are grouped, these can be used in conjunction
+ * to simulate <i>secondary sort on values</i>.</p>
+ *
+ *
+ * For example, say that you want to find duplicate web pages and tag them
+ * all with the url of the "best" known example. You would set up the job
+ * like:
+ * <ul>
+ * <li>Map Input Key: url</li>
+ * <li>Map Input Value: document</li>
+ * <li>Map Output Key: document checksum, url pagerank</li>
+ * <li>Map Output Value: url</li>
+ * <li>Partitioner: by checksum</li>
+ * <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
+ * <li>OutputValueGroupingComparator: by checksum</li>
+ * </ul>
+ * </li>
+ *
+ * <li>
+ * <h4 id="Reduce">Reduce</h4>
+ *
+ * <p>In this phase the
+ * {@link #reduce(WritableComparable, Iterator, OutputCollector, Reporter)}
+ * method is called for each <code><key, (list of values)></code> pair in
+ * the grouped inputs.</p>
+ * <p>The output of the reduce task is typically written to the
+ * {@link FileSystem} via
+ * {@link OutputCollector#collect(WritableComparable, Writable)}.</p>
+ * </li>
+ * </ol>
+ *
+ * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
+ *
+ * <p>Example:</p>
+ * <p><blockquote><pre>
+ * public class MyReducer<K extends WritableComparable, V extends Writable>
+ * extends MapReduceBase implements Reducer<K, V, K, V> {
+ *
+ * static enum MyCounters { NUM_RECORDS }
+ *
+ * private String reduceTaskId;
+ * private int noKeys = 0;
+ *
+ * public void configure(JobConf job) {
+ * reduceTaskId = job.get("mapred.task.id");
+ * }
+ *
+ * public void reduce(K key, Iterator<V> values,
+ * OutputCollector<K, V> output,
+ * Reporter reporter)
+ * throws IOException {
+ *
+ * // Process
+ * int noValues = 0;
+ * while (values.hasNext()) {
+ * V value = values.next();
+ *
+ * // Increment the no. of values for this key
+ * ++noValues;
+ *
+ * // Process the <key, value> pair (assume this takes a while)
+ * // ...
+ * // ...
+ *
+ * // Let the framework know that we are alive, and kicking!
+ * if ((noValues%10) == 0) {
+ * reporter.progress();
+ * }
+ *
+ * // Process some more
+ * // ...
+ * // ...
+ *
+ * // Output the <key, value>
+ * output.collect(key, value);
+ * }
+ *
+ * // Increment the no. of <key, list of values> pairs processed
+ * ++noKeys;
+ *
+ * // Increment counters
+ * reporter.incrCounter(NUM_RECORDS, 1);
+ *
+ * // Every 100 keys update application-level status
+ * if ((noKeys%100) == 0) {
+ * reporter.setStatus(reduceTaskId + " processed " + noKeys);
+ * }
+ * }
+ * }
+ * </pre></blockquote></p>
+ *
+ * @see Mapper
+ * @see Partitioner
+ * @see Reporter
+ * @see MapReduceBase
+ */
public interface Reducer<K2 extends WritableComparable, V2 extends Writable,
K3 extends WritableComparable, V3 extends Writable>
extends JobConfigurable, Closeable {
- /** Combines values for a given key. Output values must be of the same type
- * as input values. Input keys must not be altered. Typically all values
- * are combined into zero or one value. Output pairs are collected with
- * calls to {@link OutputCollector#collect(WritableComparable,Writable)}.
+ /**
+ * <i>Reduces</i> values for a given key.
+ *
+ * <p>The framework calls this method for each
+ * <code><key, (list of values)></code> pair in the grouped inputs.
+ * Output values must be of the same type as input values. Input keys must
+ * not be altered. Typically all values are combined into zero or one value.
+ * </p>
+ *
+ * <p>Output pairs are collected with calls to
+ * {@link OutputCollector#collect(WritableComparable,Writable)}.</p>
*
- * @param key the key
- * @param values the values to combine
- * @param output to collect combined values
+ * <p>Applications can use the {@link Reporter} provided to report progress
+ * or just indicate that they are alive. In scenarios where the application
+ * takes an insignificant amount of time to process individual key/value
+ * pairs, this is crucial since the framework might assume that the task has
+ * timed-out and kill that task. The other way of avoiding this is to set
+ * <a href="{@docRoot}/../hadoop-default.html#mapred.task.timeout">
+ * mapred.task.timeout</a> to a high-enough value (or even zero for no
+ * time-outs).</p>
+ *
+ * @param key the key.
+ * @param values the list of values to reduce.
+ * @param output to collect keys and combined values.
+ * @param reporter facility to report progress.
*/
void reduce(K2 key, Iterator<V2> values,
OutputCollector<K3, V3> output, Reporter reporter)
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reporter.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/Reporter.java Wed Oct 24 14:24:51 2007
@@ -18,11 +18,24 @@
package org.apache.hadoop.mapred;
-import java.io.IOException;
-
import org.apache.hadoop.util.Progressable;
-/** Passed to application code to permit alteration of status. */
+/**
+ * A facility for Map-Reduce applications to report progress and update
+ * counters, status information etc.
+ *
+ * <p>{@link Mapper} and {@link Reducer} can use the <code>Reporter</code>
+ * provided to report progress or just indicate that they are alive. In
+ * scenarios where the application takes an insignificant amount of time to
+ * process individual key/value pairs, this is crucial since the framework
+ * might assume that the task has timed-out and kill that task.
+ *
+ * <p>Applications can also update {@link Counters} via the provided
+ * <code>Reporter</code> .</p>
+ *
+ * @see Progressable
+ * @see Counters
+ */
public interface Reporter extends Progressable {
/**
@@ -41,25 +54,27 @@
};
/**
- * Alter the application's status description.
+ * Set the status description for the task.
*
- * @param status
- * a brief description of the current status
+ * @param status brief description of the current status.
*/
public abstract void setStatus(String status);
/**
* Increments the counter identified by the key, which can be of
- * any enum type, by the specified amount.
- * @param key A value of any enum type
+ * any {@link Enum} type, by the specified amount.
+ *
+ * @param key key to identify the counter to be incremented. The key can be
+ * be any <code>Enum</code>.
* @param amount A non-negative amount by which the counter is to
- * be incremented
+ * be incremented.
*/
public abstract void incrCounter(Enum key, long amount);
/**
- * Get the InputSplit object for a map.
- * @return the input split that the map is reading from
+ * Get the {@link InputSplit} object for a map.
+ *
+ * @return the <code>InputSplit</code> that the map is reading from.
* @throws UnsupportedOperationException if called outside a mapper
*/
public abstract InputSplit getInputSplit()
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/RunningJob.java Wed Oct 24 14:24:51 2007
@@ -21,78 +21,120 @@
import java.io.*;
/**
- * Includes details on a running MapReduce job. A client can
- * track a living job using this object.
+ * <code>RunningJob</code> is the user-interface to query for details on a
+ * running Map-Reduce job.
+ *
+ * <p>Clients can get hold of <code>RunningJob</code> via the {@link JobClient}
+ * and then query the running-job for details such as name, configuration,
+ * progress etc.</p>
+ *
+ * @see JobClient
*/
public interface RunningJob {
/**
- * Returns an identifier for the job
+ * Get the job identifier.
+ *
+ * @return the job identifier.
*/
public String getJobID();
/**
- * Returns the name of the job
+ * Get the name of the job.
+ *
+ * @return the name of the job.
*/
public String getJobName();
/**
- * Returns the path of the submitted job.
+ * Get the path of the submitted job configuration.
+ *
+ * @return the path of the submitted job configuration.
*/
public String getJobFile();
/**
- * Returns a URL where some job progress information will be displayed.
+ * Get the URL where some job progress information will be displayed.
+ *
+ * @return the URL where some job progress information will be displayed.
*/
public String getTrackingURL();
/**
- * Returns a float between 0.0 and 1.0, indicating progress on
- * the map portion of the job. When all map tasks have completed,
- * the function returns 1.0.
+ * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
+ * and 1.0. When all map tasks have completed, the function returns 1.0.
+ *
+ * @return the progress of the job's map-tasks.
+ * @throws IOException
*/
public float mapProgress() throws IOException;
/**
- * Returns a float between 0.0 and 1.0, indicating progress on
- * the reduce portion of the job. When all reduce tasks have completed,
- * the function returns 1.0.
+ * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
+ * and 1.0. When all reduce tasks have completed, the function returns 1.0.
+ *
+ * @return the progress of the job's reduce-tasks.
+ * @throws IOException
*/
public float reduceProgress() throws IOException;
/**
- * Non-blocking function to check whether the job is finished or not.
+ * Check if the job is finished or not.
+ * This is a non-blocking call.
+ *
+ * @return <code>true</code> if the job is complete, else <code>false</code>.
+ * @throws IOException
*/
public boolean isComplete() throws IOException;
/**
- * True iff job completed successfully.
+ * Check if the job completed successfully.
+ *
+ * @return <code>true</code> if the job succeeded, else <code>false</code>.
+ * @throws IOException
*/
public boolean isSuccessful() throws IOException;
/**
* Blocks until the job is complete.
+ *
+ * @throws IOException
*/
public void waitForCompletion() throws IOException;
/**
* Kill the running job. Blocks until all job tasks have been
* killed as well. If the job is no longer running, it simply returns.
+ *
+ * @throws IOException
*/
public void killJob() throws IOException;
- public TaskCompletionEvent[] getTaskCompletionEvents(
- int startFrom) throws IOException;
+ /**
+ * Get events indicating completion (success/failure) of component tasks.
+ *
+ * @param startFrom index to start fetching events from
+ * @return an array of {@link TaskCompletionEvent}s
+ * @throws IOException
+ */
+ public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom)
+ throws IOException;
/**
* Kill indicated task attempt.
- * @param taskId the id of the task to kill.
- * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
- * it is just killed, w/o affecting job failure status.
+ *
+ * @param taskId the id of the task to be terminated.
+ * @param shouldFail if true the task is failed and added to failed tasks
+ * list, otherwise it is just killed, w/o affecting
+ * job failure status.
+ * @throws IOException
*/
public void killTask(String taskId, boolean shouldFail) throws IOException;
/**
* Gets the counters for this job.
+ *
+ * @return the counters for this job.
+ * @throws IOException
*/
public Counters getCounters() throws IOException;
}
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/package.html
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/package.html?rev=588035&r1=588034&r2=588035&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/package.html (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/package.html Wed Oct 24 14:24:51 2007
@@ -1,16 +1,212 @@
<html>
<body>
-<p>A system for scalable, fault-tolerant, distributed computation over
-large data collections.</p>
+<p>A software framework for easily writing applications which process vast
+amounts of data (multi-terabyte data-sets) parallelly on large clusters
+(thousands of nodes) built of commodity hardware in a reliable, fault-tolerant
+manner.</p>
-<p>Applications implement {@link org.apache.hadoop.mapred.Mapper} and
-{@link org.apache.hadoop.mapred.Reducer} interfaces. These are submitted
-as a {@link org.apache.hadoop.mapred.JobConf} and are applied to data
-stored in a {@link org.apache.hadoop.fs.FileSystem}.</p>
+<p>A Map-Reduce <i>job</i> usually splits the input data-set into independent
+chunks which processed by <i>map</i> tasks in completely parallel manner,
+followed by <i>reduce</i> tasks which aggregating their output. Typically both
+the input and the output of the job are stored in a
+{@link org.apache.hadoop.fs.FileSystem}. The framework takes care of monitoring
+tasks and re-executing failed ones. Since, usually, the compute nodes and the
+storage nodes are the same i.e. Hadoop's Map-Reduce framework and Distributed
+FileSystem are running on the same set of nodes, tasks are effectively scheduled
+on the nodes where data is already present, resulting in very high aggregate
+bandwidth across the cluster.</p>
-<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's
-original Map/Reduce paper</a> for background information.</p>
+<p>The Map-Reduce framework operates exclusively on <tt><key, value></tt>
+pairs i.e. the input to the job is viewed as a set of <tt><key, value></tt>
+pairs and the output as another, possibly different, set of
+<tt><key, value></tt> pairs. The <tt>key</tt>s and <tt>value</tt>s have to
+be serializable as {@link org.apache.hadoop.io.Writable}s and additionally the
+<tt>key</tt>s have to be {@link org.apache.hadoop.io.WritableComparable}s in
+order to facilitate grouping by the framework.</p>
+
+<p>Data flow:</p>
+<pre>
+ (input)
+ <tt><k1, v1></tt>
+
+ |
+ V
+
+ <b>map</b>
+
+ |
+ V
+
+ <tt><k2, v2></tt>
+
+ |
+ V
+
+ <b>combine</b>
+
+ |
+ V
+
+ <tt><k2, v2></tt>
+
+ |
+ V
+
+ <b>reduce</b>
+
+ |
+ V
+
+ <tt><k3, v3></tt>
+ (output)
+</pre>
+
+<p>Applications typically implement
+{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)}
+and
+{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)}
+methods. The application-writer also specifies various facets of the job such
+as input and output locations, the <tt>Partitioner</tt>, <tt>InputFormat</tt>
+& <tt>OutputFormat</tt> implementations to be used etc. as
+a {@link org.apache.hadoop.mapred.JobConf}. The client program,
+{@link org.apache.hadoop.mapred.JobClient}, then submits the job to the framework
+and optionally monitors it.</p>
+
+<p>The framework spawns one map task per
+{@link org.apache.hadoop.mapred.InputSplit} generated by the
+{@link org.apache.hadoop.mapred.InputFormat} of the job and calls
+{@link org.apache.hadoop.mapred.Mapper#map(WritableComparable, Writable, OutputCollector, Reporter)}
+with each <key, value> pair read by the
+{@link org.apache.hadoop.mapred.RecordReader} from the <tt>InputSplit</tt> for
+the task. The intermediate outputs of the maps are then grouped by <tt>key</tt>s
+and optionally aggregated by <i>combiner</i>. The key space of intermediate
+outputs are paritioned by the {@link org.apache.hadoop.mapred.Partitioner}, where
+the number of partitions is exactly the number of reduce tasks for the job.</p>
+
+<p>The reduce tasks fetch the sorted intermediate outputs of the maps, via http,
+merge the <key, value> pairs and call
+{@link org.apache.hadoop.mapred.Reducer#reduce(WritableComparable, Iterator, OutputCollector, Reporter)}
+for each <key, list of values> pair. The output of the reduce tasks' is
+stored on the <tt>FileSystem</tt> by the
+{@link org.apache.hadoop.mapred.RecordWriter} provided by the
+{@link org.apache.hadoop.mapred.OutputFormat} of the job.</p>
+
+<p>Map-Reduce application to perform a distributed <i>grep</i>:</p>
+<pre><tt>
+public class Grep extends Configured implements Tool {
+
+ // <i>map: Search for the pattern specified by 'grep.mapper.regex' &</i>
+ // <i>'grep.mapper.regex.group'</i>
+
+ class GrepMapper<K extends WritableComparable, Text>
+ extends MapReduceBase implements Mapper<K, Text, Text, LongWritable> {
+
+ private Pattern pattern;
+ private int group;
+
+ public void configure(JobConf job) {
+ pattern = Pattern.compile(job.get("grep.mapper.regex"));
+ group = job.getInt("grep.mapper.regex.group", 0);
+ }
+
+ public void map(K key, Text value,
+ OutputCollector<Text, LongWritable> output,
+ Reporter reporter)
+ throws IOException {
+ String text = value.toString();
+ Matcher matcher = pattern.matcher(text);
+ while (matcher.find()) {
+ output.collect(new Text(matcher.group(group)), new LongWritable(1));
+ }
+ }
+ }
+
+ // <i>reduce: Count the number of occurrences of the pattern</i>
+
+ class GrepReducer<K extends WritableComparable> extends MapReduceBase
+ implements Reducer<K, LongWritable, K, LongWritable> {
+
+ public void reduce(K key, Iterator<LongWritable> values,
+ OutputCollector<K, LongWritable> output,
+ Reporter reporter)
+ throws IOException {
+
+ // sum all values for this key
+ long sum = 0;
+ while (values.hasNext()) {
+ sum += values.next().get();
+ }
+
+ // output sum
+ output.collect(key, new LongWritable(sum));
+ }
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 3) {
+ System.out.println("Grep <inDir> <outDir> <regex> [<group>]");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ JobConf grepJob = new JobConf(getConf(), Grep.class);
+
+ grepJob.setJobName("grep");
+
+ grepJob.setInputPath(new Path(args[0]));
+ grepJob.setOutputPath(args[1]);
+
+ grepJob.setMapperClass(GrepMapper.class);
+ grepJob.setCombinerClass(GrepReducer.class);
+ grepJob.setReducerClass(GrepReducer.class);
+
+ grepJob.set("mapred.mapper.regex", args[2]);
+ if (args.length == 4)
+ grepJob.set("mapred.mapper.regex.group", args[3]);
+
+ grepJob.setOutputFormat(SequenceFileOutputFormat.class);
+ grepJob.setOutputKeyClass(Text.class);
+ grepJob.setOutputValueClass(LongWritable.class);
+
+ JobClient.runJob(grepJob);
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new Grep(), args);
+ System.exit(res);
+ }
+
+}
+</tt></pre>
+
+<p>Notice how the data-flow of the above grep job is very similar to doing the
+same via the unix pipeline:</p>
+
+<pre>
+cat input/* | grep | sort | uniq -c > out
+</pre>
+
+<pre>
+ input | map | shuffle | reduce > out
+</pre>
+
+<p>Hadoop Map-Reduce applications need not be written in
+Java<small><sup>TM</sup></small> only.
+<a href="../streaming/package-summary.html">Hadoop Streaming</a> is a utility
+which allows users to create and run jobs with any executables (e.g. shell
+utilities) as the mapper and/or the reducer.
+<a href="pipes/package-summary.html">Hadoop Pipes</a> is a
+<a href="http://www.swig.org/">SWIG</a>-compatible <em>C++ API</em> to implement
+Map-Reduce applications (non JNI<small><sup>TM</sup></small> based).</p>
+
+<p>See <a href="http://labs.google.com/papers/mapreduce.html">Google's original
+Map/Reduce paper</a> for background information.</p>
+
+<p><i>Java and JNI are trademarks or registered trademarks of
+Sun Microsystems, Inc. in the United States and other countries.</i></p>
</body>
</html>