You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/10/10 21:39:39 UTC
svn commit: r312722 -
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java
Author: cutting
Date: Mon Oct 10 12:39:36 2005
New Revision: 312722
URL: http://svn.apache.org/viewcvs?rev=312722&view=rev
Log:
Perform deduping in reduce. Delete temp files.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java?rev=312722&r1=312721&r2=312722&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java Mon Oct 10 12:39:36 2005
@@ -36,7 +36,7 @@
* Duplicates have either the same contents (via MD5 hash) or the same URL.
******************************************************************/
public class DeleteDuplicates extends NutchConfigured
- implements Mapper, OutputFormat {
+ implements Mapper, Reducer, OutputFormat {
private static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.crawl.DeleteDuplicates");
@@ -230,12 +230,21 @@
}
}
+ private NutchFileSystem fs;
+
public DeleteDuplicates() { super(null); }
public DeleteDuplicates(NutchConf conf) { super(conf); }
- public void configure(JobConf job) {}
+ public void configure(JobConf job) {
+ try {
+ fs = NutchFileSystem.get(job);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ /** Map [*,IndexDoc] pairs to [index,doc] pairs. */
public void map(WritableComparable key, Writable value,
OutputCollector output, Reporter reporter)
throws IOException {
@@ -243,30 +252,31 @@
output.collect(indexDoc.index, new IntWritable(indexDoc.doc));
}
- private HashMap readers = new HashMap();
+ /** Delete docs named in values from index named in key. */
+ public void reduce(WritableComparable key, Iterator values,
+ OutputCollector output, Reporter reporter)
+ throws IOException {
+ File index = new File(key.toString());
+ IndexReader reader = IndexReader.open(new NdfsDirectory(fs, index, false));
+ try {
+ while (values.hasNext()) {
+ reader.delete(((IntWritable)values.next()).get());
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ /** Write nothing. */
public RecordWriter getRecordWriter(final NutchFileSystem fs,
final JobConf job,
final String name) throws IOException {
return new RecordWriter() {
- /** Delete value from index named in key. */
public void write(WritableComparable key, Writable value)
throws IOException {
- IndexReader reader = (IndexReader)readers.get(key);
- if (reader == null) {
- File index = new File(key.toString());
- reader = IndexReader.open(new NdfsDirectory(fs, index, false));
- readers.put(key, reader);
- }
- reader.delete(((IntWritable)value).get());
- }
-
- /** Close indexes, flushing deletions. */
- public void close(Reporter reporter) throws IOException {
- Iterator i = readers.values().iterator();
- while (i.hasNext()) {
- ((IndexReader)i.next()).close();
- }
- }
+ throw new UnsupportedOperationException();
+ }
+ public void close(Reporter reporter) throws IOException {}
};
}
@@ -310,12 +320,16 @@
job.setInputValueClass(IndexDoc.class);
job.setMapperClass(DeleteDuplicates.class);
+ job.setReducerClass(DeleteDuplicates.class);
job.setOutputFormat(DeleteDuplicates.class);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(IntWritable.class);
JobClient.runJob(job);
+
+ new JobClient(getConf()).getFs().delete(hashDir);
+
LOG.info("Dedup: done");
}