You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Doug Cutting (JIRA)" <ji...@apache.org> on 2006/04/25 01:10:23 UTC
[jira] Updated: (HADOOP-20) Mapper, Reducer need an occasion to
cleanup after the last record is processed.
[ http://issues.apache.org/jira/browse/HADOOP-20?page=all ]
Doug Cutting updated HADOOP-20:
-------------------------------
Fix Version: 0.1.0
> Mapper, Reducer need an occasion to cleanup after the last record is processed.
> -------------------------------------------------------------------------------
>
> Key: HADOOP-20
> URL: http://issues.apache.org/jira/browse/HADOOP-20
> Project: Hadoop
> Type: Improvement
> Components: mapred
> Environment: Linux
> Reporter: Michel Tourn
> Fix For: 0.1.0
> Attachments: mapredfinished.log, mrclose.patch
>
> Mapper, Reducer need an occasion to do some cleanup after the last record is processed.
> Proposal (patch attached)
> in interface Mapper:
> add method void finished();
> in interface Reducer:
> add method void finished();
> finished() methods are called from MapTask, CombiningCollector, ReduceTask.
> ------------
> Known limitation: Fetcher (a multithreaded MapRunnable) does not call finished().
> This is not currently a problem bec. fetcher Map/Reduce modules do not do anything in finished().
> The right way to add finished() support to Fetcher would be to wait for all threads to finish,
> then do:
> if (collector instanceof CombiningCollector) ((CombiningCollector)collector).finished();
> ------------
> patch begins: (svn trunk)
> Index: src/test/org/apache/nutch/mapred/MapredLoadTest.java
> ===================================================================
> --- src/test/org/apache/nutch/mapred/MapredLoadTest.java (revision 374781)
> +++ src/test/org/apache/nutch/mapred/MapredLoadTest.java (working copy)
> @@ -69,6 +69,8 @@
> out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
> }
> }
> + public void finished() {
> + }
> }
> static class RandomGenReducer implements Reducer {
> public void configure(JobConf job) {
> @@ -81,6 +83,8 @@
> out.collect(new UTF8("" + val), new UTF8(""));
> }
> }
> + public void finished() {
> + }
> }
> static class RandomCheckMapper implements Mapper {
> public void configure(JobConf job) {
> @@ -92,6 +96,8 @@
>
> out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
> }
> + public void finished() {
> + }
> }
> static class RandomCheckReducer implements Reducer {
> public void configure(JobConf job) {
> @@ -106,6 +112,8 @@
> }
> out.collect(new IntWritable(keyint), new IntWritable(count));
> }
> + public void finished() {
> + }
> }
>
> int range;
> Index: src/test/org/apache/nutch/fs/TestNutchFileSystem.java
> ===================================================================
> --- src/test/org/apache/nutch/fs/TestNutchFileSystem.java (revision 374783)
> +++ src/test/org/apache/nutch/fs/TestNutchFileSystem.java (working copy)
> @@ -155,6 +155,8 @@
>
> reporter.setStatus("wrote " + name);
> }
> +
> + public void finished() {}
> }
>
> public static void writeTest(NutchFileSystem fs, boolean fastCheck)
> @@ -247,6 +249,9 @@
>
> reporter.setStatus("read " + name);
> }
> +
> + public void finished() {}
> +
> }
>
> public static void readTest(NutchFileSystem fs, boolean fastCheck)
> @@ -339,6 +344,9 @@
> in.close();
> }
> }
> +
> + public void finished() {}
> +
> }
>
> public static void seekTest(NutchFileSystem fs, boolean fastCheck)
> Index: src/java/org/apache/nutch/indexer/DeleteDuplicates.java
> ===================================================================
> --- src/java/org/apache/nutch/indexer/DeleteDuplicates.java (revision 374776)
> +++ src/java/org/apache/nutch/indexer/DeleteDuplicates.java (working copy)
> @@ -225,6 +225,7 @@
> }
> }
> }
> + public void finished() {}
> }
>
> private NutchFileSystem fs;
> @@ -265,6 +266,8 @@
> reader.close();
> }
> }
> +
> + public void finished() {}
>
> /** Write nothing. */
> public RecordWriter getRecordWriter(final NutchFileSystem fs,
> Index: src/java/org/apache/nutch/indexer/Indexer.java
> ===================================================================
> --- src/java/org/apache/nutch/indexer/Indexer.java (revision 374778)
> +++ src/java/org/apache/nutch/indexer/Indexer.java (working copy)
> @@ -227,6 +227,8 @@
>
> output.collect(key, new ObjectWritable(doc));
> }
> +
> + public void finished() {}
>
> public void index(File indexDir, File crawlDb, File linkDb, File[] segments)
> throws IOException {
> Index: src/java/org/apache/nutch/segment/SegmentReader.java
> ===================================================================
> --- src/java/org/apache/nutch/segment/SegmentReader.java (revision 374778)
> +++ src/java/org/apache/nutch/segment/SegmentReader.java (working copy)
> @@ -143,7 +143,9 @@
> }
> output.collect(key, new ObjectWritable(dump.toString()));
> }
> -
> +
> + public void finished() {}
> +
> public void reader(File segment) throws IOException {
> LOG.info("Reader: segment: " + segment);
>
> Index: src/java/org/apache/nutch/mapred/Mapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/Mapper.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/Mapper.java (working copy)
> @@ -39,4 +39,9 @@
> void map(WritableComparable key, Writable value,
> OutputCollector output, Reporter reporter)
> throws IOException;
> +
> + /** Called after the last {@link #map} call on this Mapper object.
> + Typical implementations do nothing.
> + */
> + void finished();
> }
> Index: src/java/org/apache/nutch/mapred/lib/RegexMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/RegexMapper.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/RegexMapper.java (working copy)
> @@ -53,4 +53,5 @@
> output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
> }
> }
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/mapred/lib/InverseMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/InverseMapper.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/InverseMapper.java (working copy)
> @@ -38,4 +38,6 @@
> throws IOException {
> output.collect((WritableComparable)value, key);
> }
> +
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/IdentityReducer.java (working copy)
> @@ -42,4 +42,5 @@
> }
> }
>
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/IdentityMapper.java (working copy)
> @@ -39,4 +39,5 @@
> output.collect(key, val);
> }
>
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/LongSumReducer.java (working copy)
> @@ -47,4 +47,6 @@
> // output sum
> output.collect(key, new LongWritable(sum));
> }
> +
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java (working copy)
> @@ -50,4 +50,6 @@
> output.collect(new UTF8(st.nextToken()), new LongWritable(1));
> }
> }
> +
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/mapred/ReduceTask.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/ReduceTask.java (revision 374781)
> +++ src/java/org/apache/nutch/mapred/ReduceTask.java (working copy)
> @@ -275,6 +275,7 @@
> }
>
> } finally {
> + reducer.finished();
> in.close();
> lfs.delete(new File(sortedFile)); // remove sorted
> out.close(reporter);
> Index: src/java/org/apache/nutch/mapred/MapTask.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/MapTask.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/MapTask.java (working copy)
> @@ -50,7 +50,7 @@
> public void write(DataOutput out) throws IOException {
> super.write(out);
> split.write(out);
> -
> +
> }
> public void readFields(DataInput in) throws IOException {
> super.readFields(in);
> @@ -126,6 +126,10 @@
> }
>
> } finally {
> + if (combining) {
> + ((CombiningCollector)collector).finished();
> + }
> +
> in.close(); // close input
> }
> } finally {
> @@ -147,5 +151,5 @@
> public NutchConf getConf() {
> return this.nutchConf;
> }
> -
> +
> }
> Index: src/java/org/apache/nutch/mapred/MapRunner.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/MapRunner.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/MapRunner.java (working copy)
> @@ -38,18 +38,22 @@
> public void run(RecordReader input, OutputCollector output,
> Reporter reporter)
> throws IOException {
> - while (true) {
> - // allocate new key & value instances
> - WritableComparable key =
> - (WritableComparable)job.newInstance(inputKeyClass);
> - Writable value = (Writable)job.newInstance(inputValueClass);
> + try {
> + while (true) {
> + // allocate new key & value instances
> + WritableComparable key =
> + (WritableComparable)job.newInstance(inputKeyClass);
> + Writable value = (Writable)job.newInstance(inputValueClass);
>
> - // read next key & value
> - if (!input.next(key, value))
> - return;
> + // read next key & value
> + if (!input.next(key, value))
> + return;
>
> - // map pair to output
> - mapper.map(key, value, output, reporter);
> + // map pair to output
> + mapper.map(key, value, output, reporter);
> + }
> + } finally {
> + mapper.finished();
> }
> }
>
> Index: src/java/org/apache/nutch/mapred/CombiningCollector.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/CombiningCollector.java (revision 374780)
> +++ src/java/org/apache/nutch/mapred/CombiningCollector.java (working copy)
> @@ -78,4 +78,9 @@
> count = 0;
> }
>
> + public synchronized void finished()
> + {
> + combiner.finished();
> + }
> +
> }
> Index: src/java/org/apache/nutch/mapred/Reducer.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/Reducer.java (revision 374737)
> +++ src/java/org/apache/nutch/mapred/Reducer.java (working copy)
> @@ -38,4 +38,10 @@
> void reduce(WritableComparable key, Iterator values,
> OutputCollector output, Reporter reporter)
> throws IOException;
> +
> + /** Called after the last {@link #reduce} call on this Reducer object.
> + Typical implementations do nothing.
> + */
> + void finished();
> +
> }
> Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/CrawlDbReader.java (revision 374737)
> +++ src/java/org/apache/nutch/crawl/CrawlDbReader.java (working copy)
> @@ -50,9 +50,9 @@
>
> /**
> * Read utility for the CrawlDB.
> - *
> + *
> * @author Andrzej Bialecki
> - *
> + *
> */
> public class CrawlDbReader {
>
> @@ -68,6 +68,7 @@
> output.collect(new UTF8("retry"), new LongWritable(cd.getRetriesSinceFetch()));
> output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() * 1000.0)));
> }
> + public void finished() {}
> }
>
> public static class CrawlDbStatReducer implements Reducer {
> @@ -121,6 +122,7 @@
> output.collect(new UTF8("avg score"), new LongWritable(total / cnt));
> }
> }
> + public void finished() {}
> }
>
> public static class CrawlDbDumpReducer implements Reducer {
> @@ -133,8 +135,11 @@
>
> public void configure(JobConf job) {
> }
> +
> + public void finished() {
> + }
> }
> -
> +
> public void processStatJob(String crawlDb, NutchConf config) throws IOException {
> LOG.info("CrawlDb statistics start: " + crawlDb);
> File tmpFolder = new File(crawlDb, "stat_tmp" + System.currentTimeMillis());
> @@ -219,7 +224,7 @@
> System.out.println("not found");
> }
> }
> -
> +
> public void processDumpJob(String crawlDb, String output, NutchConf config) throws IOException {
>
> LOG.info("CrawlDb dump: starting");
> @@ -270,4 +275,5 @@
> }
> return;
> }
> +
> }
> Index: src/java/org/apache/nutch/crawl/LinkDb.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/LinkDb.java (revision 374779)
> +++ src/java/org/apache/nutch/crawl/LinkDb.java (working copy)
> @@ -118,7 +118,8 @@
> output.collect(key, result);
> }
>
> -
> + public void finished() {}
> +
> public void invert(File linkDb, File segmentsDir) throws IOException {
> LOG.info("LinkDb: starting");
> LOG.info("LinkDb: linkdb: " + linkDb);
> Index: src/java/org/apache/nutch/crawl/Injector.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/Injector.java (revision 374779)
> +++ src/java/org/apache/nutch/crawl/Injector.java (working copy)
> @@ -65,6 +65,8 @@
> interval));
> }
> }
> +
> + public void finished() {}
> }
>
> /** Combine multiple new entries for a url. */
> @@ -76,6 +78,7 @@
> throws IOException {
> output.collect(key, (Writable)values.next()); // just collect first value
> }
> + public void finished() {}
> }
>
> /** Construct an Injector. */
> Index: src/java/org/apache/nutch/crawl/Generator.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/Generator.java (revision 374779)
> +++ src/java/org/apache/nutch/crawl/Generator.java (working copy)
> @@ -63,6 +63,8 @@
> output.collect(crawlDatum, key); // invert for sort by score
> }
>
> + public void finished() {}
> +
> /** Partition by host (value). */
> public int getPartition(WritableComparable key, Writable value,
> int numReduceTasks) {
> Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/CrawlDbReducer.java (revision 374781)
> +++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java (working copy)
> @@ -115,4 +115,5 @@
> }
> }
>
> + public void finished() {}
> }
> Index: src/java/org/apache/nutch/parse/ParseSegment.java
> ===================================================================
> --- src/java/org/apache/nutch/parse/ParseSegment.java (revision 374776)
> +++ src/java/org/apache/nutch/parse/ParseSegment.java (working copy)
> @@ -78,6 +78,8 @@
> throws IOException {
> output.collect(key, (Writable)values.next()); // collect first value
> }
> +
> + public void finished() {}
>
> public void parse(File segment) throws IOException {
> LOG.info("Parse: starting");
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
http://www.atlassian.com/software/jira