You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/06/09 22:51:10 UTC
svn commit: r189816 - in /lucene/nutch/branches/mapred: bin/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/db/
Author: cutting
Date: Thu Jun 9 13:51:09 2005
New Revision: 189816
URL: http://svn.apache.org/viewcvs?rev=189816&view=rev
Log:
First working version of MapReduce-based indexing.
Modified:
lucene/nutch/branches/mapred/bin/nutch
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java
Modified: lucene/nutch/branches/mapred/bin/nutch
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/bin/nutch?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/bin/nutch (original)
+++ lucene/nutch/branches/mapred/bin/nutch Thu Jun 9 13:51:09 2005
@@ -36,7 +36,7 @@
echo " parse parse a segment's pages"
echo " updatedb update crawl db from segments after fetching"
echo " invertlinks create a linkdb from parsed segments"
- echo " index run the indexer on a segment's fetcher output"
+ echo " index run the indexer on parsed segments and linkdb"
echo " merge merge several segment indexes"
echo " dedup remove duplicates from a set of segment indexes"
echo " updatesegs update segments with link data from the db"
@@ -142,7 +142,7 @@
elif [ "$COMMAND" = "invertlinks" ] ; then
CLASS=org.apache.nutch.crawl.LinkDb
elif [ "$COMMAND" = "index" ] ; then
- CLASS=org.apache.nutch.indexer.IndexSegment
+ CLASS=org.apache.nutch.crawl.Indexer
elif [ "$COMMAND" = "merge" ] ; then
CLASS=org.apache.nutch.indexer.IndexMerger
elif [ "$COMMAND" = "dedup" ] ; then
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDatum.java Thu Jun 9 13:51:09 2005
@@ -41,8 +41,10 @@
public static final byte STATUS_FETCH_RETRY = 6;
public static final byte STATUS_FETCH_GONE = 7;
+ private static final float MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000;
+
private byte status;
- private long nextFetch = System.currentTimeMillis();
+ private long fetchTime = System.currentTimeMillis();
private byte retries;
private float fetchInterval;
private int linkCount;
@@ -63,8 +65,12 @@
public byte getStatus() { return status; }
public void setStatus(int status) { this.status = (byte)status; }
- public long getNextFetchTime() { return nextFetch; }
- public void setNextFetchTime(long nextFetch) { this.nextFetch = nextFetch; }
+ public long getFetchTime() { return fetchTime; }
+ public void setFetchTime(long fetchTime) { this.fetchTime = fetchTime; }
+
+ public void setNextFetchTime() {
+ fetchTime += (long)(MILLISECONDS_PER_DAY*fetchInterval);
+ }
public byte getRetriesSinceFetch() { return retries; }
public void setRetriesSinceFetch(int retries) {this.retries = (byte)retries;}
@@ -94,7 +100,7 @@
throw new VersionMismatchException(CUR_VERSION, version);
status = in.readByte();
- nextFetch = in.readLong();
+ fetchTime = in.readLong();
retries = in.readByte();
fetchInterval = in.readFloat();
linkCount = in.readInt();
@@ -106,7 +112,7 @@
public void write(DataOutput out) throws IOException {
out.writeByte(CUR_VERSION); // store current version
out.writeByte(status);
- out.writeLong(nextFetch);
+ out.writeLong(fetchTime);
out.writeByte(retries);
out.writeFloat(fetchInterval);
out.writeInt(linkCount);
@@ -115,7 +121,7 @@
/** Copy the contents of another instance into this instance. */
public void set(CrawlDatum that) {
this.status = that.status;
- this.nextFetch = that.nextFetch;
+ this.fetchTime = that.fetchTime;
this.retries = that.retries;
this.fetchInterval = that.fetchInterval;
this.linkCount = that.linkCount;
@@ -157,7 +163,7 @@
StringBuffer buf = new StringBuffer();
buf.append("Version: " + CUR_VERSION + "\n");
buf.append("Status: " + getStatus() + "\n");
- buf.append("Next fetch: " + new Date(getNextFetchTime()) + "\n");
+ buf.append("Fetch time: " + new Date(getFetchTime()) + "\n");
buf.append("Retries since fetch: " + getRetriesSinceFetch() + "\n");
buf.append("Retry interval: " + getFetchInterval() + " days\n");
buf.append("Link Count: " + getLinkCount() + "\n");
@@ -170,7 +176,7 @@
CrawlDatum other = (CrawlDatum)o;
return
(this.status == other.status) &&
- (this.nextFetch == other.nextFetch) &&
+ (this.fetchTime == other.fetchTime) &&
(this.retries == other.retries) &&
(this.fetchInterval == other.fetchInterval) &&
(this.linkCount == other.linkCount);
@@ -179,7 +185,7 @@
public int hashCode() {
return
status ^
- ((int)nextFetch) ^
+ ((int)fetchTime) ^
retries ^
Float.floatToIntBits(fetchInterval) ^
linkCount;
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Thu Jun 9 13:51:09 2005
@@ -75,6 +75,7 @@
case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch
result = highest; // use new entry
result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
+ result.setNextFetchTime();
break;
case CrawlDatum.STATUS_FETCH_RETRY: // temporary failure
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Thu Jun 9 13:51:09 2005
@@ -35,6 +35,8 @@
public static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.fetcher.Fetcher");
+ public static final String DIGEST_KEY = "nutch.content.digest";
+
private RecordReader input;
private OutputCollector output;
@@ -155,6 +157,9 @@
String url = key.toString();
content = new Content(url, url, new byte[0], "", new Properties());
}
+
+ content.getMetadata().setProperty // add digest to metadata
+ (DIGEST_KEY, MD5Hash.digest(content.getContent()).toString());
try {
output.collect(key, new FetcherOutput(datum, content));
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Generator.java Thu Jun 9 13:51:09 2005
@@ -55,7 +55,7 @@
if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE)
return; // don't retry
- if (crawlDatum.getNextFetchTime() > curTime)
+ if (crawlDatum.getFetchTime() > curTime)
return; // not time yet
output.collect(crawlDatum, key); // invert for sort by linkCount
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Indexer.java Thu Jun 9 13:51:09 2005
@@ -27,7 +27,11 @@
import org.apache.nutch.mapred.*;
import org.apache.nutch.parse.*;
import org.apache.nutch.analysis.*;
+
import org.apache.nutch.indexer.*;
+import org.apache.nutch.pagedb.FetchListEntry;
+import org.apache.nutch.fetcher.FetcherOutput;
+import org.apache.nutch.db.Page;
import org.apache.lucene.index.*;
import org.apache.lucene.document.*;
@@ -80,6 +84,9 @@
writer.setTermIndexInterval
(job.getInt("indexer.termIndexInterval", 128));
writer.maxFieldLength = job.getInt("indexer.max.tokens", 10000);
+ //writer.infoStream = LogFormatter.getLogStream(LOG, Level.FINE);
+ writer.setUseCompoundFile(false);
+ writer.setSimilarity(new NutchSimilarity());
return new RecordWriter() {
@@ -118,12 +125,15 @@
public void reduce(WritableComparable key, Iterator values,
OutputCollector output) throws IOException {
Inlinks inlinks = null;
+ CrawlDatum crawlDatum = null;
ParseData parseData = null;
ParseText parseText = null;
while (values.hasNext()) {
Object value = ((ObjectWritable)values.next()).get(); // unwrap
if (value instanceof Inlinks) {
inlinks = (Inlinks)value;
+ } else if (value instanceof CrawlDatum) {
+ crawlDatum = (CrawlDatum)value;
} else if (value instanceof ParseData) {
parseData = (ParseData)value;
} else if (value instanceof ParseText) {
@@ -133,30 +143,47 @@
}
}
- if (parseText == null || parseData == null) {
+ if (crawlDatum == null || parseText == null || parseData == null) {
return; // only have inlinks
}
Document doc = new Document();
+ Properties meta = parseData.getMetadata();
- // add docno & segment, used to map from merged index back to segment files
- //doc.add(Field.UnIndexed("docNo", Long.toString(docNo, 16)));
- //doc.add(Field.UnIndexed("segment", segmentName));
+ // add segment, used to map from merged index back to segment files
+ doc.add(Field.UnIndexed("segment",
+ meta.getProperty(ParseSegment.SEGMENT_NAME_KEY)));
// add digest, used by dedup
- //doc.add(Field.UnIndexed("digest", fo.getMD5Hash().toString()));
+ doc.add(Field.UnIndexed("digest", meta.getProperty(Fetcher.DIGEST_KEY)));
- // 4. Apply boost to all indexed fields.
+ // compute boost
float boost =
- IndexSegment.calculateBoost(1.0f,scorePower, boostByLinkCount,
+ IndexSegment.calculateBoost(1.0f, scorePower, boostByLinkCount,
inlinks == null ? 0 : inlinks.size());
+ // apply boost to all indexed fields.
doc.setBoost(boost);
// store boost for use by explain and dedup
doc.add(Field.UnIndexed("boost", Float.toString(boost)));
+// LOG.info("Url: "+key.toString());
+// LOG.info("Title: "+parseData.getTitle());
+// LOG.info(crawlDatum.toString());
+// if (inlinks != null) {
+// LOG.info(inlinks.toString());
+// }
+
try {
- doc = IndexingFilters.filter(doc, new ParseImpl(parseText, parseData),
- null);
+ // dummy up a FetcherOutput so that we can use existing indexing filters
+ // TODO: modify IndexingFilter interface to use Inlinks, etc.
+ String[] anchors = inlinks!=null ? inlinks.getAnchors() : new String[0];
+ FetcherOutput fo =
+ new FetcherOutput(new FetchListEntry(true,new Page((UTF8)key),anchors),
+ null, null);
+ fo.setFetchDate(crawlDatum.getFetchTime());
+
+ // run indexing filters
+ doc = IndexingFilters.filter(doc,new ParseImpl(parseText, parseData),fo);
} catch (IndexingException e) {
LOG.warning("Error indexing "+key+": "+e);
return;
@@ -165,29 +192,23 @@
output.collect(key, new ObjectWritable(doc));
}
- public void index(File indexDir, File segmentsDir) throws IOException {
- JobConf job = Indexer.createJob(getConf(), indexDir);
- job.setInputDir(segmentsDir);
- job.set("mapred.input.subdir", ParseData.DIR_NAME);
- JobClient.runJob(job);
- }
+ public void index(File indexDir, File linkDb, File[] segments)
+ throws IOException {
+
+ JobConf job = new JobConf(getConf());
- public void index(File indexDir, File[] segments) throws IOException {
- JobConf job = Indexer.createJob(getConf(), indexDir);
for (int i = 0; i < segments.length; i++) {
+ job.addInputDir(new File(segments[i], CrawlDatum.FETCH_DIR_NAME));
job.addInputDir(new File(segments[i], ParseData.DIR_NAME));
+ job.addInputDir(new File(segments[i], ParseText.DIR_NAME));
}
- JobClient.runJob(job);
- }
- private static JobConf createJob(NutchConf config, File indexDir) {
- JobConf job = new JobConf(config);
+ job.addInputDir(new File(linkDb, LinkDb.CURRENT_NAME));
job.setInputFormat(InputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(ObjectWritable.class);
- job.setMapperClass(Indexer.class);
//job.setCombinerClass(Indexer.class);
job.setReducerClass(Indexer.class);
@@ -196,20 +217,23 @@
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(ObjectWritable.class);
- return job;
+ JobClient.runJob(job);
}
public static void main(String[] args) throws Exception {
Indexer indexer = new Indexer(NutchConf.get());
if (args.length < 2) {
- System.err.println("Usage: <linkdb> <segments>");
+ System.err.println("Usage: <index> <linkdb> <segment> <segment> ...");
return;
}
- indexer.index(new File(args[0]), new File(args[1]));
- }
-
+ File[] segments = new File[args.length-2];
+ for (int i = 2; i < args.length; i++) {
+ segments[i-2] = new File(args[i]);
+ }
+ indexer.index(new File(args[0]), new File(args[1]), segments);
+ }
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Inlinks.java Thu Jun 9 13:51:09 2005
@@ -17,6 +17,7 @@
package org.apache.nutch.crawl;
import java.io.*;
+import java.net.*;
import java.util.*;
import org.apache.nutch.io.*;
@@ -50,5 +51,45 @@
((Writable)inlinks.get(i)).write(out);
}
}
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("Inlinks:\n");
+ for (int i = 0; i < inlinks.size(); i++) {
+ buffer.append(" ");
+ buffer.append(inlinks.get(i));
+ buffer.append("\n");
+ }
+ return buffer.toString();
+ }
+
+ /** Return the set of anchor texts. Only a single anchor with a given text
+ * is permitted from a given domain. */
+ public String[] getAnchors() throws IOException {
+ HashMap domainToAnchors = new HashMap();
+ ArrayList results = new ArrayList();
+ for (int i = 0; i < inlinks.size(); i++) {
+ Inlink inlink = (Inlink)inlinks.get(i);
+ String anchor = inlink.getAnchor();
+
+ if (anchor.length() == 0) // skip empty anchors
+ continue;
+ String domain = null; // extract domain name
+ try {
+ domain = new URL(inlink.getFromUrl()).getHost();
+ } catch (MalformedURLException e) {}
+ Set domainAnchors = (Set)domainToAnchors.get(domain);
+ if (domainAnchors == null) {
+ domainAnchors = new HashSet();
+ domainToAnchors.put(domain, domainAnchors);
+ }
+ if (domainAnchors.add(anchor)) { // new anchor from domain
+ results.add(anchor); // collect it
+ }
+ }
+
+ return (String[])results.toArray(new String[results.size()]);
+ }
+
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/ParseSegment.java Thu Jun 9 13:51:09 2005
@@ -35,7 +35,10 @@
public static final Logger LOG =
LogFormatter.getLogger(Parser.class.getName());
+ public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
+
private float interval;
+ private String segmentName;
private UrlNormalizer urlNormalizer = UrlNormalizerFactory.getNormalizer();
@@ -47,6 +50,7 @@
public void configure(JobConf job) {
interval = job.getFloat("db.default.fetch.interval", 30f);
+ segmentName = job.get(SEGMENT_NAME_KEY);
}
public void map(WritableComparable key, Writable value,
@@ -65,6 +69,7 @@
}
if (status.isSuccess()) {
+ parse.getData().getMetadata().setProperty(SEGMENT_NAME_KEY, segmentName);
output.collect(key, new ParseImpl(parse.getText(), parse.getData()));
} else {
LOG.warning("Error parsing: "+key+": "+status.toString());
@@ -134,6 +139,9 @@
public void parse(File segment) throws IOException {
JobConf job = new JobConf(getConf());
+
+ job.set(SEGMENT_NAME_KEY, segment.getName());
+
job.setInputDir(new File(segment, Content.DIR_NAME));
job.setInputFormat(SequenceFileInputFormat.class);
job.setInputKeyClass(UTF8.class);
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java?rev=189816&r1=189815&r2=189816&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/db/Page.java Thu Jun 9 13:51:09 2005
@@ -64,6 +64,10 @@
md5 = new MD5Hash(); // initialize for readFields()
}
+ public Page(UTF8 url) {
+ this.url = url;
+ }
+
/** Construct a new, default page, due to be fetched. */
public Page(String urlString, MD5Hash md5) throws MalformedURLException {
setURL(urlString);