You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Doug Cutting (JIRA)" <ji...@apache.org> on 2006/04/25 01:12:15 UTC

[jira] Closed: (HADOOP-20) Mapper, Reducer need an occasion to cleanup after the last record is processed.

     [ http://issues.apache.org/jira/browse/HADOOP-20?page=all ]
     
Doug Cutting closed HADOOP-20:
------------------------------


> Mapper, Reducer need an occasion to cleanup after the last record is processed.
> -------------------------------------------------------------------------------
>
>          Key: HADOOP-20
>          URL: http://issues.apache.org/jira/browse/HADOOP-20
>      Project: Hadoop
>         Type: Improvement

>   Components: mapred
>  Environment: Linux
>     Reporter: Michel Tourn
>      Fix For: 0.1.0
>  Attachments: mapredfinished.log, mrclose.patch
>
> Mapper, Reducer need an occasion to do some cleanup after the last record is processed.
> Proposal (patch attached)
> in interface Mapper:
>  add method void finished();
> in interface Reducer:
>  add method void finished();
> finished() methods are called from MapTask, CombiningCollector, ReduceTask.
> ------------
> Known limitation: Fetcher (a multithreaded MapRunnable) does not call finished().
> This is not currently a problem bec. fetcher Map/Reduce modules do not do anything in finished().
> The right way to add finished() support to Fetcher would be to wait for all threads to finish, 
> then do:
>      if (collector instanceof CombiningCollector) ((CombiningCollector)collector).finished();
> ------------
> patch begins: (svn trunk)
> Index: src/test/org/apache/nutch/mapred/MapredLoadTest.java
> ===================================================================
> --- src/test/org/apache/nutch/mapred/MapredLoadTest.java	(revision 374781)
> +++ src/test/org/apache/nutch/mapred/MapredLoadTest.java	(working copy)
> @@ -69,6 +69,8 @@
>                  out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
>              }
>          }
> +        public void finished() {
> +        }
>      }
>      static class RandomGenReducer implements Reducer {
>          public void configure(JobConf job) {
> @@ -81,6 +83,8 @@
>                  out.collect(new UTF8("" + val), new UTF8(""));
>              }
>          }
> +        public void finished() {
> +        }
>      }
>      static class RandomCheckMapper implements Mapper {
>          public void configure(JobConf job) {
> @@ -92,6 +96,8 @@
>  
>              out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
>          }
> +        public void finished() {
> +        }
>      }
>      static class RandomCheckReducer implements Reducer {
>          public void configure(JobConf job) {
> @@ -106,6 +112,8 @@
>              }
>              out.collect(new IntWritable(keyint), new IntWritable(count));
>          }
> +        public void finished() {
> +        }
>      }
>  
>      int range;
> Index: src/test/org/apache/nutch/fs/TestNutchFileSystem.java
> ===================================================================
> --- src/test/org/apache/nutch/fs/TestNutchFileSystem.java	(revision 374783)
> +++ src/test/org/apache/nutch/fs/TestNutchFileSystem.java	(working copy)
> @@ -155,6 +155,8 @@
>  
>        reporter.setStatus("wrote " + name);
>      }
> +    
> +    public void finished() {}
>    }
>  
>    public static void writeTest(NutchFileSystem fs, boolean fastCheck)
> @@ -247,6 +249,9 @@
>  
>        reporter.setStatus("read " + name);
>      }
> +    
> +    public void finished() {}
> +    
>    }
>  
>    public static void readTest(NutchFileSystem fs, boolean fastCheck)
> @@ -339,6 +344,9 @@
>          in.close();
>        }
>      }
> +    
> +    public void finished() {}
> +    
>    }
>  
>    public static void seekTest(NutchFileSystem fs, boolean fastCheck)
> Index: src/java/org/apache/nutch/indexer/DeleteDuplicates.java
> ===================================================================
> --- src/java/org/apache/nutch/indexer/DeleteDuplicates.java	(revision 374776)
> +++ src/java/org/apache/nutch/indexer/DeleteDuplicates.java	(working copy)
> @@ -225,6 +225,7 @@
>          }
>        }
>      }
> +    public void finished() {}
>    }
>      
>    private NutchFileSystem fs;
> @@ -265,6 +266,8 @@
>        reader.close();
>      }
>    }
> +  
> +  public void finished() {}
>  
>    /** Write nothing. */
>    public RecordWriter getRecordWriter(final NutchFileSystem fs,
> Index: src/java/org/apache/nutch/indexer/Indexer.java
> ===================================================================
> --- src/java/org/apache/nutch/indexer/Indexer.java	(revision 374778)
> +++ src/java/org/apache/nutch/indexer/Indexer.java	(working copy)
> @@ -227,6 +227,8 @@
>  
>      output.collect(key, new ObjectWritable(doc));
>    }
> +  
> +  public void finished() {}
>  
>    public void index(File indexDir, File crawlDb, File linkDb, File[] segments)
>      throws IOException {
> Index: src/java/org/apache/nutch/segment/SegmentReader.java
> ===================================================================
> --- src/java/org/apache/nutch/segment/SegmentReader.java	(revision 374778)
> +++ src/java/org/apache/nutch/segment/SegmentReader.java	(working copy)
> @@ -143,7 +143,9 @@
>      }
>      output.collect(key, new ObjectWritable(dump.toString()));
>    }
> -
> +  
> +  public void finished() {}
> +  
>    public void reader(File segment) throws IOException {
>      LOG.info("Reader: segment: " + segment);
>  
> Index: src/java/org/apache/nutch/mapred/Mapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/Mapper.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/Mapper.java	(working copy)
> @@ -39,4 +39,9 @@
>    void map(WritableComparable key, Writable value,
>             OutputCollector output, Reporter reporter)
>      throws IOException;
> +
> +  /** Called after the last {@link #map} call on this Mapper object.
> +      Typical implementations do nothing.
> +  */
> +  void finished();
>  }
> Index: src/java/org/apache/nutch/mapred/lib/RegexMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/RegexMapper.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/RegexMapper.java	(working copy)
> @@ -53,4 +53,5 @@
>        output.collect(new UTF8(matcher.group(group)), new LongWritable(1));
>      }
>    }
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/mapred/lib/InverseMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/InverseMapper.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/InverseMapper.java	(working copy)
> @@ -38,4 +38,6 @@
>      throws IOException {
>      output.collect((WritableComparable)value, key);
>    }
> +
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/mapred/lib/IdentityReducer.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/IdentityReducer.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/IdentityReducer.java	(working copy)
> @@ -42,4 +42,5 @@
>      }
>    }
>  
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/mapred/lib/IdentityMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/IdentityMapper.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/IdentityMapper.java	(working copy)
> @@ -39,4 +39,5 @@
>      output.collect(key, val);
>    }
>  
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/mapred/lib/LongSumReducer.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/LongSumReducer.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/LongSumReducer.java	(working copy)
> @@ -47,4 +47,6 @@
>      // output sum
>      output.collect(key, new LongWritable(sum));
>    }
> +
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/lib/TokenCountMapper.java	(working copy)
> @@ -50,4 +50,6 @@
>        output.collect(new UTF8(st.nextToken()), new LongWritable(1));
>      }
>    }
> +
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/mapred/ReduceTask.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/ReduceTask.java	(revision 374781)
> +++ src/java/org/apache/nutch/mapred/ReduceTask.java	(working copy)
> @@ -275,6 +275,7 @@
>        }
>  
>      } finally {
> +      reducer.finished();
>        in.close();
>        lfs.delete(new File(sortedFile));           // remove sorted
>        out.close(reporter);
> Index: src/java/org/apache/nutch/mapred/MapTask.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/MapTask.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/MapTask.java	(working copy)
> @@ -50,7 +50,7 @@
>    public void write(DataOutput out) throws IOException {
>      super.write(out);
>      split.write(out);
> -    
> +
>    }
>    public void readFields(DataInput in) throws IOException {
>      super.readFields(in);
> @@ -126,6 +126,10 @@
>          }
>  
>        } finally {
> +        if (combining) {
> +          ((CombiningCollector)collector).finished();
> +        }
> +
>          in.close();                               // close input
>        }
>      } finally {
> @@ -147,5 +151,5 @@
>    public NutchConf getConf() {
>      return this.nutchConf;
>    }
> -  
> +
>  }
> Index: src/java/org/apache/nutch/mapred/MapRunner.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/MapRunner.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/MapRunner.java	(working copy)
> @@ -38,18 +38,22 @@
>    public void run(RecordReader input, OutputCollector output,
>                    Reporter reporter)
>      throws IOException {
> -    while (true) {
> -      // allocate new key & value instances
> -      WritableComparable key =
> -        (WritableComparable)job.newInstance(inputKeyClass);
> -      Writable value = (Writable)job.newInstance(inputValueClass);
> +    try {
> +      while (true) {
> +        // allocate new key & value instances
> +        WritableComparable key =
> +          (WritableComparable)job.newInstance(inputKeyClass);
> +        Writable value = (Writable)job.newInstance(inputValueClass);
>  
> -      // read next key & value
> -      if (!input.next(key, value))
> -        return;
> +        // read next key & value
> +        if (!input.next(key, value))
> +          return;
>  
> -      // map pair to output
> -      mapper.map(key, value, output, reporter);
> +        // map pair to output
> +        mapper.map(key, value, output, reporter);
> +      }
> +    } finally {
> +        mapper.finished();
>      }
>    }
>  
> Index: src/java/org/apache/nutch/mapred/CombiningCollector.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/CombiningCollector.java	(revision 374780)
> +++ src/java/org/apache/nutch/mapred/CombiningCollector.java	(working copy)
> @@ -78,4 +78,9 @@
>      count = 0;
>    }
>  
> +  public synchronized void finished()
> +  {
> +    combiner.finished();
> +  }
> +
>  }
> Index: src/java/org/apache/nutch/mapred/Reducer.java
> ===================================================================
> --- src/java/org/apache/nutch/mapred/Reducer.java	(revision 374737)
> +++ src/java/org/apache/nutch/mapred/Reducer.java	(working copy)
> @@ -38,4 +38,10 @@
>    void reduce(WritableComparable key, Iterator values,
>                OutputCollector output, Reporter reporter)
>      throws IOException;
> +
> +  /** Called after the last {@link #reduce} call on this Reducer object.
> +      Typical implementations do nothing.
> +  */
> +  void finished();
> +
>  }
> Index: src/java/org/apache/nutch/crawl/CrawlDbReader.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/CrawlDbReader.java	(revision 374737)
> +++ src/java/org/apache/nutch/crawl/CrawlDbReader.java	(working copy)
> @@ -50,9 +50,9 @@
>  
>  /**
>   * Read utility for the CrawlDB.
> - * 
> + *
>   * @author Andrzej Bialecki
> - * 
> + *
>   */
>  public class CrawlDbReader {
>  
> @@ -68,6 +68,7 @@
>        output.collect(new UTF8("retry"), new LongWritable(cd.getRetriesSinceFetch()));
>        output.collect(new UTF8("score"), new LongWritable((long) (cd.getScore() * 1000.0)));
>      }
> +    public void finished() {}
>    }
>  
>    public static class CrawlDbStatReducer implements Reducer {
> @@ -121,6 +122,7 @@
>          output.collect(new UTF8("avg score"), new LongWritable(total / cnt));
>        }
>      }
> +    public void finished() {}
>    }
>  
>    public static class CrawlDbDumpReducer implements Reducer {
> @@ -133,8 +135,11 @@
>  
>      public void configure(JobConf job) {
>      }
> +
> +    public void finished() {
> +    }
>    }
> -  
> +
>    public void processStatJob(String crawlDb, NutchConf config) throws IOException {
>      LOG.info("CrawlDb statistics start: " + crawlDb);
>      File tmpFolder = new File(crawlDb, "stat_tmp" + System.currentTimeMillis());
> @@ -219,7 +224,7 @@
>        System.out.println("not found");
>      }
>    }
> -  
> +
>    public void processDumpJob(String crawlDb, String output, NutchConf config) throws IOException {
>  
>      LOG.info("CrawlDb dump: starting");
> @@ -270,4 +275,5 @@
>      }
>      return;
>    }
> +
>  }
> Index: src/java/org/apache/nutch/crawl/LinkDb.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/LinkDb.java	(revision 374779)
> +++ src/java/org/apache/nutch/crawl/LinkDb.java	(working copy)
> @@ -118,7 +118,8 @@
>      output.collect(key, result);
>    }
>  
> -
> +  public void finished() {}
> +	
>    public void invert(File linkDb, File segmentsDir) throws IOException {
>      LOG.info("LinkDb: starting");
>      LOG.info("LinkDb: linkdb: " + linkDb);
> Index: src/java/org/apache/nutch/crawl/Injector.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/Injector.java	(revision 374779)
> +++ src/java/org/apache/nutch/crawl/Injector.java	(working copy)
> @@ -65,6 +65,8 @@
>                                               interval));
>        }
>      }
> +    
> +    public void finished() {}
>    }
>  
>    /** Combine multiple new entries for a url. */
> @@ -76,6 +78,7 @@
>        throws IOException {
>        output.collect(key, (Writable)values.next()); // just collect first value
>      }
> +    public void finished() {}
>    }
>  
>    /** Construct an Injector. */
> Index: src/java/org/apache/nutch/crawl/Generator.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/Generator.java	(revision 374779)
> +++ src/java/org/apache/nutch/crawl/Generator.java	(working copy)
> @@ -63,6 +63,8 @@
>        output.collect(crawlDatum, key);          // invert for sort by score
>      }
>  
> +    public void finished() {}
> +    
>      /** Partition by host (value). */
>      public int getPartition(WritableComparable key, Writable value,
>                              int numReduceTasks) {
> Index: src/java/org/apache/nutch/crawl/CrawlDbReducer.java
> ===================================================================
> --- src/java/org/apache/nutch/crawl/CrawlDbReducer.java	(revision 374781)
> +++ src/java/org/apache/nutch/crawl/CrawlDbReducer.java	(working copy)
> @@ -115,4 +115,5 @@
>      }
>    }
>  
> +  public void finished() {}
>  }
> Index: src/java/org/apache/nutch/parse/ParseSegment.java
> ===================================================================
> --- src/java/org/apache/nutch/parse/ParseSegment.java	(revision 374776)
> +++ src/java/org/apache/nutch/parse/ParseSegment.java	(working copy)
> @@ -78,6 +78,8 @@
>      throws IOException {
>      output.collect(key, (Writable)values.next()); // collect first value
>    }
> +  
> +  public void finished() {}
>  
>    public void parse(File segment) throws IOException {
>      LOG.info("Parse: starting");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
   http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see:
   http://www.atlassian.com/software/jira