You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/04/04 13:03:13 UTC

svn commit: r391271 - /lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java

Author: ab
Date: Tue Apr  4 04:03:09 2006
New Revision: 391271

URL: http://svn.apache.org/viewcvs?rev=391271&view=rev
Log:
Use a separate float value for sorting and selecting topN records.
This decouples the selection process from values in CrawlDatum
and its compareTo. See NUTCH-240 for more details.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?rev=391271&r1=391270&r2=391271&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Tue Apr  4 04:03:09 2006
@@ -38,6 +38,26 @@
 
   public static final Logger LOG =
     LogFormatter.getLogger("org.apache.nutch.crawl.Generator");
+  
+  public static class SelectorEntry implements Writable {
+    public UTF8 url;
+    public CrawlDatum datum;
+    
+    public SelectorEntry() {
+      url = new UTF8();
+      datum = new CrawlDatum();
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      url.readFields(in);
+      datum.readFields(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      url.write(out);
+      datum.write(out);
+    }    
+  }
 
   /** Selects entries due for fetch. */
   public static class Selector implements Mapper, Partitioner, Reducer {
@@ -48,6 +68,8 @@
     private int maxPerHost;
     private Partitioner hostPartitioner = new PartitionUrlByHost();
     private URLFilters filters;
+    private SelectorEntry entry = new SelectorEntry();
+    private FloatWritable sortValue = new FloatWritable();
 
     public void configure(JobConf job) {
       curTime = job.getLong("crawl.gen.curTime", System.currentTimeMillis());
@@ -78,13 +100,17 @@
       if (crawlDatum.getFetchTime() > curTime)
         return;                                   // not time yet
 
-      output.collect(crawlDatum, key);          // invert for sort by score
+      // sort by decreasing score
+      sortValue.set(crawlDatum.getScore());
+      entry.datum = crawlDatum;
+      entry.url = (UTF8)key;
+      output.collect(sortValue, entry);          // invert for sort by score
     }
 
-    /** Partition by host (value). */
+    /** Partition by host. */
     public int getPartition(WritableComparable key, Writable value,
                             int numReduceTasks) {
-      return hostPartitioner.getPartition((WritableComparable)value, key,
+      return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
                                           numReduceTasks);
     }
 
@@ -95,7 +121,8 @@
 
       while (values.hasNext() && count < limit) {
 
-        UTF8 url = (UTF8)values.next();
+        SelectorEntry entry = (SelectorEntry)values.next();
+        UTF8 url = entry.url;
 
         if (maxPerHost > 0) {                     // are we counting hosts?
           String host = new URL(url.toString()).getHost();
@@ -109,7 +136,7 @@
           }
         }
 
-        output.collect(key, url);
+        output.collect(key, entry);
 
         // Count is incremented only when we keep the URL
         // maxPerHost may cause us to skip it.
@@ -120,6 +147,14 @@
 
   }
 
+  public static class SelectorInverseMapper extends MapReduceBase {
+
+    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+      SelectorEntry entry = (SelectorEntry)value;
+      output.collect(entry.url, entry.datum);
+    }    
+  }
+  
   /** Sort fetch lists by hash of URL. */
   public static class HashComparator extends WritableComparator {
     public HashComparator() { super(UTF8.class); }
@@ -209,8 +244,8 @@
 
     job.setOutputDir(tempDir);
     job.setOutputFormat(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(CrawlDatum.class);
-    job.setOutputValueClass(UTF8.class);
+    job.setOutputKeyClass(FloatWritable.class);
+    job.setOutputValueClass(SelectorEntry.class);
     JobClient.runJob(job);
 
     // invert again, paritition by host, sort by url hash
@@ -222,10 +257,10 @@
 
     job.setInputDir(tempDir);
     job.setInputFormat(SequenceFileInputFormat.class);
-    job.setInputKeyClass(CrawlDatum.class);
-    job.setInputValueClass(UTF8.class);
+    job.setInputKeyClass(FloatWritable.class);
+    job.setInputValueClass(SelectorEntry.class);
 
-    job.setMapperClass(InverseMapper.class);
+    job.setMapperClass(SelectorInverseMapper.class);
     job.setPartitionerClass(PartitionUrlByHost.class);
     job.setNumReduceTasks(numLists);