You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/10/10 21:39:39 UTC

svn commit: r312722 - /lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java

Author: cutting
Date: Mon Oct 10 12:39:36 2005
New Revision: 312722

URL: http://svn.apache.org/viewcvs?rev=312722&view=rev
Log:
Perform deduping in reduce.  Delete temp files.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java?rev=312722&r1=312721&r2=312722&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/DeleteDuplicates.java Mon Oct 10 12:39:36 2005
@@ -36,7 +36,7 @@
  * Duplicates have either the same contents (via MD5 hash) or the same URL.
  ******************************************************************/
 public class DeleteDuplicates extends NutchConfigured
-  implements Mapper, OutputFormat {
+  implements Mapper, Reducer, OutputFormat {
   private static final Logger LOG =
     LogFormatter.getLogger("org.apache.nutch.crawl.DeleteDuplicates");
 
@@ -230,12 +230,21 @@
     }
   }
     
+  private NutchFileSystem fs;
+
   public DeleteDuplicates() { super(null); }
 
   public DeleteDuplicates(NutchConf conf) { super(conf); }
 
-  public void configure(JobConf job) {}
+  public void configure(JobConf job) {
+    try {
+      fs = NutchFileSystem.get(job);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
+  /** Map [*,IndexDoc] pairs to [index,doc] pairs. */
   public void map(WritableComparable key, Writable value,
                   OutputCollector output, Reporter reporter)
     throws IOException {
@@ -243,30 +252,31 @@
     output.collect(indexDoc.index, new IntWritable(indexDoc.doc));
   }
 
-  private HashMap readers = new HashMap();
+  /** Delete docs named in values from index named in key. */
+  public void reduce(WritableComparable key, Iterator values,
+                     OutputCollector output, Reporter reporter)
+    throws IOException {
+    File index = new File(key.toString());
+    IndexReader reader = IndexReader.open(new NdfsDirectory(fs, index, false));
+    try {
+      while (values.hasNext()) {
+        reader.delete(((IntWritable)values.next()).get());
+      }
+    } finally {
+      reader.close();
+    }
+  }
+
+  /** Write nothing. */
   public RecordWriter getRecordWriter(final NutchFileSystem fs,
                                       final JobConf job,
                                       final String name) throws IOException {
     return new RecordWriter() {                   
-        /** Delete value from index named in key. */
         public void write(WritableComparable key, Writable value)
           throws IOException {
-          IndexReader reader = (IndexReader)readers.get(key);
-          if (reader == null) {
-            File index = new File(key.toString());
-            reader = IndexReader.open(new NdfsDirectory(fs, index, false));
-            readers.put(key, reader);
-          }
-          reader.delete(((IntWritable)value).get());
-        }
-        
-        /** Close indexes, flushing deletions. */
-        public void close(Reporter reporter) throws IOException {
-          Iterator i = readers.values().iterator();
-          while (i.hasNext()) {
-            ((IndexReader)i.next()).close();
-          }
-        }
+          throw new UnsupportedOperationException();
+        }        
+        public void close(Reporter reporter) throws IOException {}
       };
   }
 
@@ -310,12 +320,16 @@
     job.setInputValueClass(IndexDoc.class);
 
     job.setMapperClass(DeleteDuplicates.class);
+    job.setReducerClass(DeleteDuplicates.class);
 
     job.setOutputFormat(DeleteDuplicates.class);
     job.setOutputKeyClass(UTF8.class);
     job.setOutputValueClass(IntWritable.class);
 
     JobClient.runJob(job);
+
+    new JobClient(getConf()).getFs().delete(hashDir);
+
     LOG.info("Dedup: done");
   }