You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/04/28 21:56:48 UTC

svn commit: r397995 - /lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java

Author: ab
Date: Fri Apr 28 12:56:47 2006
New Revision: 397995

URL: http://svn.apache.org/viewcvs?rev=397995&view=rev
Log:
Implement fully incremental LinkDb updates.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?rev=397995&r1=397994&r2=397995&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Fri Apr 28 12:56:47 2006
@@ -23,6 +23,7 @@
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.LogFormatter;
 import org.apache.hadoop.mapred.*;
@@ -42,6 +43,34 @@
   private int maxAnchorLength;
   private int maxInlinks;
   private boolean ignoreInternalLinks;
+  
+  public static class LinkDbMerger extends MapReduceBase implements Reducer {
+    private int _maxInlinks;
+    
+    public void configure(JobConf job) {
+      super.configure(job);
+      _maxInlinks = job.getInt("db.max.inlinks", 10000);
+    }
+
+    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+      Inlinks inlinks = null;
+      while (values.hasNext()) {
+        if (inlinks == null) {
+          inlinks = (Inlinks)values.next();
+          continue;
+        }
+        Inlinks val = (Inlinks)values.next();
+        for (Iterator it = val.iterator(); it.hasNext(); ) {
+          if (inlinks.size() >= _maxInlinks) {
+            output.collect(key, inlinks);
+            return;
+          }
+          inlinks.add((Inlink)it.next());
+        }
+      }
+      output.collect(key, inlinks);
+    }
+  }
 
   public LinkDb() {
     super(null);
@@ -126,18 +155,17 @@
     output.collect(key, result);
   }
 
-
-  public void invert(File linkDb, File segmentsDir) throws IOException {
-    LOG.info("LinkDb: starting");
-    LOG.info("LinkDb: linkdb: " + linkDb);
-    LOG.info("LinkDb: segments: " + segmentsDir);
-
-    JobConf job = LinkDb.createJob(getConf(), linkDb);
-    job.setInputDir(segmentsDir);
-    job.set("mapred.input.subdir", ParseData.DIR_NAME);
-    JobClient.runJob(job);
-    LinkDb.install(job, linkDb);
-    LOG.info("LinkDb: done");
+  public void invert(File linkDb, final File segmentsDir) throws IOException {
+    final FileSystem fs = FileSystem.get(getConf());
+    File[] files = fs.listFiles(segmentsDir, new FileFilter() {
+      public boolean accept(File f) {
+        try {
+          if (fs.isDirectory(f)) return true;
+        } catch (IOException ioe) {};
+        return false;
+      }
+    });
+    invert(linkDb, files);
   }
 
   public void invert(File linkDb, File[] segments) throws IOException {
@@ -149,13 +177,24 @@
       job.addInputDir(new File(segments[i], ParseData.DIR_NAME));
     }
     JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(getConf());
+    if (fs.exists(linkDb)) {
+      LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
+      // try to merge
+      File newLinkDb = job.getOutputDir();
+      job = LinkDb.createMergeJob(getConf(), linkDb);
+      job.addInputDir(new File(linkDb, CURRENT_NAME));
+      job.addInputDir(newLinkDb);
+      JobClient.runJob(job);
+      fs.delete(newLinkDb);
+    }
     LinkDb.install(job, linkDb);
     LOG.info("LinkDb: done");
   }
 
   private static JobConf createJob(Configuration config, File linkDb) {
     File newLinkDb =
-      new File(linkDb,
+      new File("linkdb-" +
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     JobConf job = new NutchJob(config);
@@ -178,6 +217,29 @@
     return job;
   }
 
+  private static JobConf createMergeJob(Configuration config, File linkDb) {
+    File newLinkDb =
+      new File("linkdb-merge-" + 
+               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("linkdb merge " + linkDb);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setInputKeyClass(UTF8.class);
+    job.setInputValueClass(Inlinks.class);
+
+    job.setReducerClass(LinkDbMerger.class);
+
+    job.setOutputDir(newLinkDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setBoolean("mapred.output.compress", true);
+    job.setOutputKeyClass(UTF8.class);
+    job.setOutputValueClass(Inlinks.class);
+
+    return job;
+  }
+
   public static void install(JobConf job, File linkDb) throws IOException {
     File newLinkDb = job.getOutputDir();
     FileSystem fs = new JobClient(job).getFs();
@@ -190,25 +252,33 @@
   }
 
   public static void main(String[] args) throws Exception {
-    LinkDb linkDb = new LinkDb(NutchConfiguration.create());
+    Configuration conf = NutchConfiguration.create();
+    LinkDb linkDb = new LinkDb(conf);
     
     if (args.length < 2) {
       System.err.println("Usage: <linkdb> (-dir segmentsDir | segment1 segment2 ...)");
       return;
     }
-    boolean dir = false;
     File segDir = null;
+    final FileSystem fs = FileSystem.get(conf);
     File db = new File(args[0]);
     ArrayList segs = new ArrayList();
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-dir")) {
-        dir = true;
         segDir = new File(args[++i]);
+        File[] files = fs.listFiles(segDir, new FileFilter() {
+          public boolean accept(File f) {
+            try {
+              if (fs.isDirectory(f)) return true;
+            } catch (IOException ioe) {};
+            return false;
+          }
+        });
+        if (files != null) segs.addAll(Arrays.asList(files));
         break;
       } else segs.add(new File(args[i]));
     }
-    if (dir) linkDb.invert(db, segDir);
-    else linkDb.invert(db, (File[])segs.toArray(new File[segs.size()]));
+    linkDb.invert(db, (File[])segs.toArray(new File[segs.size()]));
   }