You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/06/09 22:51:10 UTC

svn commit: r189816 - in /lucene/nutch/branches/mapred: bin/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/db/

Author: cutting
Date: Thu Jun  9 13:51:09 2005
New Revision: 189816

URL: http://svn.apache.org/viewcvs?rev=189816&view=rev
Log:
First working version of MapReduce-based indexing.

Modified:
    lucene/nutch/branches/mapred/bin/nutch
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java

Modified: lucene/nutch/branches/mapred/bin/nutch
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/bin/nutch?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/bin/nutch (original)
+++ lucene/nutch/branches/mapred/bin/nutch Thu Jun  9 13:51:09 2005
@@ -36,7 +36,7 @@
   echo "  parse             parse a segment's pages"
   echo "  updatedb          update crawl db from segments after fetching"
   echo "  invertlinks       create a linkdb from parsed segments"
-  echo "  index             run the indexer on a segment's fetcher output"
+  echo "  index             run the indexer on parsed segments and linkdb"
   echo "  merge             merge several segment indexes"
   echo "  dedup             remove duplicates from a set of segment indexes"
   echo "  updatesegs        update segments with link data from the db"
@@ -142,7 +142,7 @@
 elif [ "$COMMAND" = "invertlinks" ] ; then
   CLASS=org.apache.nutch.crawl.LinkDb
 elif [ "$COMMAND" = "index" ] ; then
-  CLASS=org.apache.nutch.indexer.IndexSegment
+  CLASS=org.apache.nutch.crawl.Indexer
 elif [ "$COMMAND" = "merge" ] ; then
   CLASS=org.apache.nutch.indexer.IndexMerger
 elif [ "$COMMAND" = "dedup" ] ; then

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java Thu Jun  9 13:51:09 2005
@@ -41,8 +41,10 @@
   public static final byte STATUS_FETCH_RETRY = 6;
   public static final byte STATUS_FETCH_GONE = 7;
 
+  private static final float MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000;
+
   private byte status;
-  private long nextFetch = System.currentTimeMillis();
+  private long fetchTime = System.currentTimeMillis();
   private byte retries;
   private float fetchInterval;
   private int linkCount;
@@ -63,8 +65,12 @@
   public byte getStatus() { return status; }
   public void setStatus(int status) { this.status = (byte)status; }
 
-  public long getNextFetchTime() { return nextFetch; }
-  public void setNextFetchTime(long nextFetch) { this.nextFetch = nextFetch; }
+  public long getFetchTime() { return fetchTime; }
+  public void setFetchTime(long fetchTime) { this.fetchTime = fetchTime; }
+
+  public void setNextFetchTime() {
+    fetchTime += (long)(MILLISECONDS_PER_DAY*fetchInterval);
+  }
 
   public byte getRetriesSinceFetch() { return retries; }
   public void setRetriesSinceFetch(int retries) {this.retries = (byte)retries;}
@@ -94,7 +100,7 @@
       throw new VersionMismatchException(CUR_VERSION, version);
 
     status = in.readByte();
-    nextFetch = in.readLong();
+    fetchTime = in.readLong();
     retries = in.readByte();
     fetchInterval = in.readFloat();
     linkCount = in.readInt();
@@ -106,7 +112,7 @@
   public void write(DataOutput out) throws IOException {
     out.writeByte(CUR_VERSION);                   // store current version
     out.writeByte(status);
-    out.writeLong(nextFetch);
+    out.writeLong(fetchTime);
     out.writeByte(retries);
     out.writeFloat(fetchInterval);
     out.writeInt(linkCount);
@@ -115,7 +121,7 @@
   /** Copy the contents of another instance into this instance. */
   public void set(CrawlDatum that) {
     this.status = that.status;
-    this.nextFetch = that.nextFetch;
+    this.fetchTime = that.fetchTime;
     this.retries = that.retries;
     this.fetchInterval = that.fetchInterval;
     this.linkCount = that.linkCount;
@@ -157,7 +163,7 @@
     StringBuffer buf = new StringBuffer();
     buf.append("Version: " + CUR_VERSION + "\n");
     buf.append("Status: " + getStatus() + "\n");
-    buf.append("Next fetch: " + new Date(getNextFetchTime()) + "\n");
+    buf.append("Fetch time: " + new Date(getFetchTime()) + "\n");
     buf.append("Retries since fetch: " + getRetriesSinceFetch() + "\n");
     buf.append("Retry interval: " + getFetchInterval() + " days\n");
     buf.append("Link Count: " + getLinkCount() + "\n");
@@ -170,7 +176,7 @@
     CrawlDatum other = (CrawlDatum)o;
     return
       (this.status == other.status) &&
-      (this.nextFetch == other.nextFetch) &&
+      (this.fetchTime == other.fetchTime) &&
       (this.retries == other.retries) &&
       (this.fetchInterval == other.fetchInterval) &&
       (this.linkCount == other.linkCount);
@@ -179,7 +185,7 @@
   public int hashCode() {
     return
       status ^
-      ((int)nextFetch) ^
+      ((int)fetchTime) ^
       retries ^
       Float.floatToIntBits(fetchInterval) ^
       linkCount;

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Thu Jun  9 13:51:09 2005
@@ -75,6 +75,7 @@
     case CrawlDatum.STATUS_FETCH_SUCCESS:         // succesful fetch
       result = highest;                           // use new entry
       result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
+      result.setNextFetchTime();
       break;
 
     case CrawlDatum.STATUS_FETCH_RETRY:           // temporary failure

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Thu Jun  9 13:51:09 2005
@@ -35,6 +35,8 @@
   public static final Logger LOG =
     LogFormatter.getLogger("org.apache.nutch.fetcher.Fetcher");
   
+  public static final String DIGEST_KEY = "nutch.content.digest";
+
   private RecordReader input;
   private OutputCollector output;
 
@@ -155,6 +157,9 @@
         String url = key.toString();
         content = new Content(url, url, new byte[0], "", new Properties());
       }
+
+      content.getMetadata().setProperty           // add digest to metadata
+        (DIGEST_KEY, MD5Hash.digest(content.getContent()).toString());
 
       try {
         output.collect(key, new FetcherOutput(datum, content));

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java Thu Jun  9 13:51:09 2005
@@ -55,7 +55,7 @@
       if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE)
         return;                                   // don't retry
 
-      if (crawlDatum.getNextFetchTime() > curTime)
+      if (crawlDatum.getFetchTime() > curTime)
         return;                                   // not time yet
 
       output.collect(crawlDatum, key);          // invert for sort by linkCount

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Thu Jun  9 13:51:09 2005
@@ -27,7 +27,11 @@
 import org.apache.nutch.mapred.*;
 import org.apache.nutch.parse.*;
 import org.apache.nutch.analysis.*;
+
 import org.apache.nutch.indexer.*;
+import org.apache.nutch.pagedb.FetchListEntry;
+import org.apache.nutch.fetcher.FetcherOutput;
+import org.apache.nutch.db.Page;
 
 import org.apache.lucene.index.*;
 import org.apache.lucene.document.*;
@@ -80,6 +84,9 @@
       writer.setTermIndexInterval
         (job.getInt("indexer.termIndexInterval", 128));
       writer.maxFieldLength = job.getInt("indexer.max.tokens", 10000);
+      //writer.infoStream = LogFormatter.getLogStream(LOG, Level.FINE);
+      writer.setUseCompoundFile(false);
+      writer.setSimilarity(new NutchSimilarity());
 
       return new RecordWriter() {
 
@@ -118,12 +125,15 @@
   public void reduce(WritableComparable key, Iterator values,
                      OutputCollector output) throws IOException {
     Inlinks inlinks = null;
+    CrawlDatum crawlDatum = null;
     ParseData parseData = null;
     ParseText parseText = null;
     while (values.hasNext()) {
       Object value = ((ObjectWritable)values.next()).get(); // unwrap
       if (value instanceof Inlinks) {
         inlinks = (Inlinks)value;
+      } else if (value instanceof CrawlDatum) {
+        crawlDatum = (CrawlDatum)value;
       } else if (value instanceof ParseData) {
         parseData = (ParseData)value;
       } else if (value instanceof ParseText) {
@@ -133,30 +143,47 @@
       }
     }      
 
-    if (parseText == null || parseData == null) {
+    if (crawlDatum == null || parseText == null || parseData == null) {
       return;                                     // only have inlinks
     }
 
     Document doc = new Document();
+    Properties meta = parseData.getMetadata();
 
-    // add docno & segment, used to map from merged index back to segment files
-    //doc.add(Field.UnIndexed("docNo", Long.toString(docNo, 16)));
-    //doc.add(Field.UnIndexed("segment", segmentName));
+    // add segment, used to map from merged index back to segment files
+    doc.add(Field.UnIndexed("segment",
+                            meta.getProperty(ParseSegment.SEGMENT_NAME_KEY)));
 
     // add digest, used by dedup
-    //doc.add(Field.UnIndexed("digest", fo.getMD5Hash().toString()));
+    doc.add(Field.UnIndexed("digest", meta.getProperty(Fetcher.DIGEST_KEY)));
 
-    // 4. Apply boost to all indexed fields.
+    // compute boost
     float boost =
-      IndexSegment.calculateBoost(1.0f,scorePower, boostByLinkCount,
+      IndexSegment.calculateBoost(1.0f, scorePower, boostByLinkCount,
                                   inlinks == null ? 0 : inlinks.size());
+    // apply boost to all indexed fields.
     doc.setBoost(boost);
     // store boost for use by explain and dedup
     doc.add(Field.UnIndexed("boost", Float.toString(boost)));
 
+//     LOG.info("Url: "+key.toString());
+//     LOG.info("Title: "+parseData.getTitle());
+//     LOG.info(crawlDatum.toString());
+//     if (inlinks != null) {
+//       LOG.info(inlinks.toString());
+//     }
+
     try {
-      doc = IndexingFilters.filter(doc, new ParseImpl(parseText, parseData),
-                                   null);
+      // dummy up a FetcherOutput so that we can use existing indexing filters
+      // TODO: modify IndexingFilter interface to use Inlinks, etc. 
+      String[] anchors = inlinks!=null ? inlinks.getAnchors() : new String[0];
+      FetcherOutput fo =
+        new FetcherOutput(new FetchListEntry(true,new Page((UTF8)key),anchors),
+                          null, null);
+      fo.setFetchDate(crawlDatum.getFetchTime());
+
+      // run indexing filters
+      doc = IndexingFilters.filter(doc,new ParseImpl(parseText, parseData),fo);
     } catch (IndexingException e) {
       LOG.warning("Error indexing "+key+": "+e);
       return;
@@ -165,29 +192,23 @@
     output.collect(key, new ObjectWritable(doc));
   }
 
-  public void index(File indexDir, File segmentsDir) throws IOException {
-    JobConf job = Indexer.createJob(getConf(), indexDir);
-    job.setInputDir(segmentsDir);
-    job.set("mapred.input.subdir", ParseData.DIR_NAME);
-    JobClient.runJob(job);
-  }
+  public void index(File indexDir, File linkDb, File[] segments)
+    throws IOException {
+
+    JobConf job = new JobConf(getConf());
 
-  public void index(File indexDir, File[] segments) throws IOException {
-    JobConf job = Indexer.createJob(getConf(), indexDir);
     for (int i = 0; i < segments.length; i++) {
+      job.addInputDir(new File(segments[i], CrawlDatum.FETCH_DIR_NAME));
       job.addInputDir(new File(segments[i], ParseData.DIR_NAME));
+      job.addInputDir(new File(segments[i], ParseText.DIR_NAME));
     }
-    JobClient.runJob(job);
-  }
 
-  private static JobConf createJob(NutchConf config, File indexDir) {
-    JobConf job = new JobConf(config);
+    job.addInputDir(new File(linkDb, LinkDb.CURRENT_NAME));
 
     job.setInputFormat(InputFormat.class);
     job.setInputKeyClass(UTF8.class);
     job.setInputValueClass(ObjectWritable.class);
 
-    job.setMapperClass(Indexer.class);
     //job.setCombinerClass(Indexer.class);
     job.setReducerClass(Indexer.class);
 
@@ -196,20 +217,23 @@
     job.setOutputKeyClass(UTF8.class);
     job.setOutputValueClass(ObjectWritable.class);
 
-    return job;
+    JobClient.runJob(job);
   }
 
   public static void main(String[] args) throws Exception {
     Indexer indexer = new Indexer(NutchConf.get());
     
     if (args.length < 2) {
-      System.err.println("Usage: <linkdb> <segments>");
+      System.err.println("Usage: <index> <linkdb> <segment> <segment> ...");
       return;
     }
     
-    indexer.index(new File(args[0]), new File(args[1]));
-  }
-
+    File[] segments = new File[args.length-2];
+    for (int i = 2; i < args.length; i++) {
+      segments[i-2] = new File(args[i]);
+    }
 
+    indexer.index(new File(args[0]), new File(args[1]), segments);
+  }
 
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java Thu Jun  9 13:51:09 2005
@@ -17,6 +17,7 @@
 package org.apache.nutch.crawl;
 
 import java.io.*;
+import java.net.*;
 import java.util.*;
 
 import org.apache.nutch.io.*;
@@ -50,5 +51,45 @@
       ((Writable)inlinks.get(i)).write(out);
     }
   }
+
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("Inlinks:\n");
+    for (int i = 0; i < inlinks.size(); i++) {
+      buffer.append(" ");
+      buffer.append(inlinks.get(i));
+      buffer.append("\n");
+    }
+    return buffer.toString();
+  }
+
+  /** Return the set of anchor texts.  Only a single anchor with a given text
+   * is permitted from a given domain. */
+  public String[] getAnchors() throws IOException {
+    HashMap domainToAnchors = new HashMap();
+    ArrayList results = new ArrayList();
+    for (int i = 0; i < inlinks.size(); i++) {
+      Inlink inlink = (Inlink)inlinks.get(i);
+      String anchor = inlink.getAnchor();
+
+      if (anchor.length() == 0)                   // skip empty anchors
+        continue;
+      String domain = null;                       // extract domain name
+      try {
+        domain = new URL(inlink.getFromUrl()).getHost();
+      } catch (MalformedURLException e) {}
+      Set domainAnchors = (Set)domainToAnchors.get(domain);
+      if (domainAnchors == null) {
+        domainAnchors = new HashSet();
+        domainToAnchors.put(domain, domainAnchors);
+      }
+      if (domainAnchors.add(anchor)) {            // new anchor from domain
+        results.add(anchor);                      // collect it
+      }
+    }
+
+    return (String[])results.toArray(new String[results.size()]);
+  }
+
 
 }

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java Thu Jun  9 13:51:09 2005
@@ -35,7 +35,10 @@
   public static final Logger LOG =
     LogFormatter.getLogger(Parser.class.getName());
 
+  public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
+
   private float interval;
+  private String segmentName;
 
   private UrlNormalizer urlNormalizer = UrlNormalizerFactory.getNormalizer();
         
@@ -47,6 +50,7 @@
 
   public void configure(JobConf job) {
     interval = job.getFloat("db.default.fetch.interval", 30f);
+    segmentName = job.get(SEGMENT_NAME_KEY);
   }
 
   public void map(WritableComparable key, Writable value,
@@ -65,6 +69,7 @@
     }
 
     if (status.isSuccess()) {
+      parse.getData().getMetadata().setProperty(SEGMENT_NAME_KEY, segmentName);
       output.collect(key, new ParseImpl(parse.getText(), parse.getData()));
     } else {
       LOG.warning("Error parsing: "+key+": "+status.toString());
@@ -134,6 +139,9 @@
 
   public void parse(File segment) throws IOException {
     JobConf job = new JobConf(getConf());
+
+    job.set(SEGMENT_NAME_KEY, segment.getName());
+
     job.setInputDir(new File(segment, Content.DIR_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setInputKeyClass(UTF8.class);

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java Thu Jun  9 13:51:09 2005
@@ -64,6 +64,10 @@
     md5 = new MD5Hash();    // initialize for readFields()
   }
 
+  public Page(UTF8 url) {
+    this.url = url;
+  }
+
   /** Construct a new, default page, due to be fetched. */
   public Page(String urlString, MD5Hash md5) throws MalformedURLException {
     setURL(urlString);