You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/04/28 21:56:48 UTC
svn commit: r397995 -
/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
Author: ab
Date: Fri Apr 28 12:56:47 2006
New Revision: 397995
URL: http://svn.apache.org/viewcvs?rev=397995&view=rev
Log:
Implement fully incremental LinkDb updates.
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?rev=397995&r1=397994&r2=397995&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Fri Apr 28 12:56:47 2006
@@ -23,6 +23,7 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.LogFormatter;
import org.apache.hadoop.mapred.*;
@@ -42,6 +43,34 @@
private int maxAnchorLength;
private int maxInlinks;
private boolean ignoreInternalLinks;
+
+ public static class LinkDbMerger extends MapReduceBase implements Reducer {
+ private int _maxInlinks;
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ _maxInlinks = job.getInt("db.max.inlinks", 10000);
+ }
+
+ public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+ Inlinks inlinks = null;
+ while (values.hasNext()) {
+ if (inlinks == null) {
+ inlinks = (Inlinks)values.next();
+ continue;
+ }
+ Inlinks val = (Inlinks)values.next();
+ for (Iterator it = val.iterator(); it.hasNext(); ) {
+ if (inlinks.size() >= _maxInlinks) {
+ output.collect(key, inlinks);
+ return;
+ }
+ inlinks.add((Inlink)it.next());
+ }
+ }
+ output.collect(key, inlinks);
+ }
+ }
public LinkDb() {
super(null);
@@ -126,18 +155,17 @@
output.collect(key, result);
}
-
- public void invert(File linkDb, File segmentsDir) throws IOException {
- LOG.info("LinkDb: starting");
- LOG.info("LinkDb: linkdb: " + linkDb);
- LOG.info("LinkDb: segments: " + segmentsDir);
-
- JobConf job = LinkDb.createJob(getConf(), linkDb);
- job.setInputDir(segmentsDir);
- job.set("mapred.input.subdir", ParseData.DIR_NAME);
- JobClient.runJob(job);
- LinkDb.install(job, linkDb);
- LOG.info("LinkDb: done");
+ public void invert(File linkDb, final File segmentsDir) throws IOException {
+ final FileSystem fs = FileSystem.get(getConf());
+ File[] files = fs.listFiles(segmentsDir, new FileFilter() {
+ public boolean accept(File f) {
+ try {
+ if (fs.isDirectory(f)) return true;
+ } catch (IOException ioe) {};
+ return false;
+ }
+ });
+ invert(linkDb, files);
}
public void invert(File linkDb, File[] segments) throws IOException {
@@ -149,13 +177,24 @@
job.addInputDir(new File(segments[i], ParseData.DIR_NAME));
}
JobClient.runJob(job);
+ FileSystem fs = FileSystem.get(getConf());
+ if (fs.exists(linkDb)) {
+ LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
+ // try to merge
+ File newLinkDb = job.getOutputDir();
+ job = LinkDb.createMergeJob(getConf(), linkDb);
+ job.addInputDir(new File(linkDb, CURRENT_NAME));
+ job.addInputDir(newLinkDb);
+ JobClient.runJob(job);
+ fs.delete(newLinkDb);
+ }
LinkDb.install(job, linkDb);
LOG.info("LinkDb: done");
}
private static JobConf createJob(Configuration config, File linkDb) {
File newLinkDb =
- new File(linkDb,
+ new File("linkdb-" +
Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
JobConf job = new NutchJob(config);
@@ -178,6 +217,29 @@
return job;
}
+ private static JobConf createMergeJob(Configuration config, File linkDb) {
+ File newLinkDb =
+ new File("linkdb-merge-" +
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new NutchJob(config);
+ job.setJobName("linkdb merge " + linkDb);
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setInputKeyClass(UTF8.class);
+ job.setInputValueClass(Inlinks.class);
+
+ job.setReducerClass(LinkDbMerger.class);
+
+ job.setOutputDir(newLinkDb);
+ job.setOutputFormat(MapFileOutputFormat.class);
+ job.setBoolean("mapred.output.compress", true);
+ job.setOutputKeyClass(UTF8.class);
+ job.setOutputValueClass(Inlinks.class);
+
+ return job;
+ }
+
public static void install(JobConf job, File linkDb) throws IOException {
File newLinkDb = job.getOutputDir();
FileSystem fs = new JobClient(job).getFs();
@@ -190,25 +252,33 @@
}
public static void main(String[] args) throws Exception {
- LinkDb linkDb = new LinkDb(NutchConfiguration.create());
+ Configuration conf = NutchConfiguration.create();
+ LinkDb linkDb = new LinkDb(conf);
if (args.length < 2) {
System.err.println("Usage: <linkdb> (-dir segmentsDir | segment1 segment2 ...)");
return;
}
- boolean dir = false;
File segDir = null;
+ final FileSystem fs = FileSystem.get(conf);
File db = new File(args[0]);
ArrayList segs = new ArrayList();
for (int i = 1; i < args.length; i++) {
if (args[i].equals("-dir")) {
- dir = true;
segDir = new File(args[++i]);
+ File[] files = fs.listFiles(segDir, new FileFilter() {
+ public boolean accept(File f) {
+ try {
+ if (fs.isDirectory(f)) return true;
+ } catch (IOException ioe) {};
+ return false;
+ }
+ });
+ if (files != null) segs.addAll(Arrays.asList(files));
break;
} else segs.add(new File(args[i]));
}
- if (dir) linkDb.invert(db, segDir);
- else linkDb.invert(db, (File[])segs.toArray(new File[segs.size()]));
+ linkDb.invert(db, (File[])segs.toArray(new File[segs.size()]));
}