You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2010/06/30 12:36:29 UTC
svn commit: r959259 [4/12] - in /nutch/branches/nutchbase: ./ bin/ conf/
contrib/ docs/ ivy/ lib/ lib/jetty-ext/ src/engines/ src/gora/ src/java/
src/java/org/apache/nutch/analysis/ src/java/org/apache/nutch/clustering/
src/java/org/apache/nutch/crawl/...
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java Wed Jun 30 10:36:20 2010
@@ -3,75 +3,79 @@ package org.apache.nutch.crawl;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.hbase.WebTableRow;
-import org.apache.nutch.util.hbase.TableUtil;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
-public class GeneratorMapper
-extends TableMapper<SelectorEntry, WebTableRow> {
+public class GeneratorMapper
+extends GoraMapper<String, WebPage, SelectorEntry, WebPage> {
private URLFilters filters;
private URLNormalizers normalizers;
private boolean filter;
+ private boolean normalise;
private FetchSchedule schedule;
private ScoringFilters scoringFilters;
private long curTime;
@Override
- public void map(ImmutableBytesWritable key, Result result,
+ public void map(String reversedUrl, WebPage page,
Context context) throws IOException, InterruptedException {
- String reversedUrl = Bytes.toString(key.get());
String url = TableUtil.unreverseUrl(reversedUrl);
- WebTableRow row = new WebTableRow(result);
+ if (Mark.GENERATE_MARK.checkMark(page) != null) {
+ if (GeneratorJob.LOG.isDebugEnabled()) {
+ GeneratorJob.LOG.debug("Skipping " + url + "; already generated");
+ }
+ }
// If filtering is on don't generate URLs that don't pass URLFilters
try {
- url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ if (normalise) {
+ url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ }
if (filter && filters.filter(url) == null)
return;
} catch (URLFilterException e) {
- Generator.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
+ GeneratorJob.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
return;
}
// check fetch schedule
- if (!schedule.shouldFetch(url, row, curTime)) {
- if (Generator.LOG.isDebugEnabled()) {
- Generator.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" +
- row.getFetchTime() + ", curTime=" + curTime);
+ if (!schedule.shouldFetch(url, page, curTime)) {
+ if (GeneratorJob.LOG.isDebugEnabled()) {
+ GeneratorJob.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" +
+ page.getFetchTime() + ", curTime=" + curTime);
}
return;
}
-
- float score = row.getScore();
+ float score = page.getScore();
try {
- score = scoringFilters.generatorSortValue(url, row, score);
- } catch (ScoringFilterException e) {
- // ignore
+ score = scoringFilters.generatorSortValue(url, page, score);
+ } catch (ScoringFilterException e) {
+ //ignore
}
SelectorEntry entry = new SelectorEntry(url, score);
- context.write(entry, row);
-
+ context.write(entry, page);
}
@Override
public void setup(Context context) {
Configuration conf = context.getConfiguration();
filters = new URLFilters(conf);
- curTime = conf.getLong(Generator.CRAWL_GEN_CUR_TIME,
- System.currentTimeMillis());
- normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
- filter = conf.getBoolean(Generator.CRAWL_GENERATE_FILTER, true);
+ curTime =
+ conf.getLong(GeneratorJob.GENERATOR_CUR_TIME, System.currentTimeMillis());
+ normalizers =
+ new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+ filter = conf.getBoolean(GeneratorJob.GENERATOR_FILTER, true);
+ normalise = conf.getBoolean(GeneratorJob.GENERATOR_NORMALISE, true);
schedule = FetchScheduleFactory.getFetchSchedule(conf);
scoringFilters = new ScoringFilters(conf);
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java Wed Jun 30 10:36:20 2010
@@ -3,51 +3,62 @@ package org.apache.nutch.crawl;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.Random;
+import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.nutch.crawl.Generator.SelectorEntry;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
+import org.apache.nutch.fetcher.FetcherJob.FetcherMapper;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.URLUtil;
+import org.gora.mapreduce.GoraReducer;
/** Reduce class for generate
- *
+ *
* The #reduce() method write a random integer to all generated URLs. This random
* number is then used by {@link FetcherMapper}.
*
*/
public class GeneratorReducer
-extends TableReducer<SelectorEntry, WebTableRow, SelectorEntry> {
+extends GoraReducer<SelectorEntry, WebPage, String, WebPage> {
private long limit;
- private long maxPerHost;
+ private long maxCount;
private long count = 0;
+ private boolean byDomain = false;
private Map<String, Integer> hostCountMap = new HashMap<String, Integer>();
- private Random random = new Random();
+ private Utf8 crawlId;
@Override
- protected void reduce(SelectorEntry key, Iterable<WebTableRow> values,
+ protected void reduce(SelectorEntry key, Iterable<WebPage> values,
Context context) throws IOException, InterruptedException {
- for (WebTableRow row : values) {
- if (maxPerHost > 0) {
- String host = key.host;
- Integer hostCount = hostCountMap.get(host);
+ for (WebPage page : values) {
+ if (maxCount > 0) {
+ String hostordomain;
+ if (byDomain) {
+ hostordomain = URLUtil.getDomainName(key.url);
+ } else {
+ hostordomain = URLUtil.getHost(key.url);
+ }
+
+ Integer hostCount = hostCountMap.get(hostordomain);
if (hostCount == null) {
- hostCountMap.put(host, 0);
+ hostCountMap.put(hostordomain, 0);
hostCount = 0;
}
- if (hostCount > maxPerHost) {
+ if (hostCount >= maxCount) {
return;
}
- hostCountMap.put(host, hostCount + 1);
+ hostCountMap.put(hostordomain, hostCount + 1);
}
if (count >= limit) {
return;
}
-
- row.putMeta(Generator.GENERATOR_MARK, Bytes.toBytes(random.nextInt()));
- row.makeRowMutation().writeToContext(key, context);
+
+ Mark.GENERATE_MARK.putMark(page, crawlId);
+ context.write(TableUtil.reverseUrl(key.url), page);
+ context.getCounter("Generator", "GENERATE_MARK").increment(1);
count++;
}
}
@@ -56,13 +67,20 @@ extends TableReducer<SelectorEntry, WebT
protected void setup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
- long totalLimit = conf.getLong(Generator.CRAWL_TOP_N, Long.MAX_VALUE);
+ long totalLimit = conf.getLong(GeneratorJob.GENERATOR_TOP_N, Long.MAX_VALUE);
if (totalLimit == Long.MAX_VALUE) {
- limit = Long.MAX_VALUE;
+ limit = Long.MAX_VALUE;
} else {
limit = totalLimit / context.getNumReduceTasks();
}
- maxPerHost = conf.getLong(Generator.GENERATE_MAX_PER_HOST, -1);
+ maxCount = conf.getLong(GeneratorJob.GENERATOR_MAX_COUNT, -2);
+ crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID));
+ String countMode =
+ conf.get(GeneratorJob.GENERATOR_COUNT_MODE, GeneratorJob.GENERATOR_COUNT_VALUE_HOST);
+ if (countMode.equals(GeneratorJob.GENERATOR_COUNT_VALUE_DOMAIN)) {
+ byDomain = true;
+ }
+
}
-
+
}
\ No newline at end of file
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,249 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.mapreduce.GoraOutputFormat;
+
+/** This class takes a flat file of URLs and adds them to the of pages to be
+ * crawled. Useful for bootstrapping the system.
+ * The URL files contain one URL per line, optionally followed by custom metadata
+ * separated by tabs with the metadata key separated from the corresponding value by '='. <br>
+ * Note that some metadata keys are reserved : <br>
+ * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
+ * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
+ * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
+ **/
+public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
+ implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(InjectorJob.class);
+
+ private Configuration conf;
+
+ private FetchSchedule schedule;
+
+ private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+ private static final Utf8 YES_STRING = new Utf8("y");
+
+ static {
+ FIELDS.add(WebPage.Field.MARKERS);
+ FIELDS.add(WebPage.Field.STATUS);
+ }
+
+ /** metadata key reserved for setting a custom score for a specific URL */
+ public static String nutchScoreMDName = "nutch.score";
+ /**
+ * metadata key reserved for setting a custom fetchInterval for a specific URL
+ */
+ public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
+
+ public static class UrlMapper extends
+ Mapper<LongWritable, Text, String, WebPage> {
+ private URLNormalizers urlNormalizers;
+ private int interval;
+ private float scoreInjected;
+ private URLFilters filters;
+ private ScoringFilters scfilters;
+ private long curTime;
+
+ @Override
+ protected void setup(Context context) throws IOException,
+ InterruptedException {
+ urlNormalizers = new URLNormalizers(context.getConfiguration(),
+ URLNormalizers.SCOPE_INJECT);
+ interval = context.getConfiguration().getInt("db.fetch.interval.default",
+ 2592000);
+ filters = new URLFilters(context.getConfiguration());
+ scfilters = new ScoringFilters(context.getConfiguration());
+ scoreInjected = context.getConfiguration().getFloat("db.score.injected",
+ 1.0f);
+ curTime = context.getConfiguration().getLong("injector.current.time",
+ System.currentTimeMillis());
+ }
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context)
+ throws IOException, InterruptedException {
+ String url = value.toString();
+
+ // if tabs : metadata that could be stored
+ // must be name=value and separated by \t
+ float customScore = -1f;
+ int customInterval = interval;
+ Map<String, String> metadata = new TreeMap<String, String>();
+ if (url.indexOf("\t") != -1) {
+ String[] splits = url.split("\t");
+ url = splits[0];
+ for (int s = 1; s < splits.length; s++) {
+ // find separation between name and value
+ int indexEquals = splits[s].indexOf("=");
+ if (indexEquals == -1) {
+ // skip anything without a =
+ continue;
+ }
+ String metaname = splits[s].substring(0, indexEquals);
+ String metavalue = splits[s].substring(indexEquals + 1);
+ if (metaname.equals(nutchScoreMDName)) {
+ try {
+ customScore = Float.parseFloat(metavalue);
+ } catch (NumberFormatException nfe) {
+ }
+ } else if (metaname.equals(nutchFetchIntervalMDName)) {
+ try {
+ customInterval = Integer.parseInt(metavalue);
+ } catch (NumberFormatException nfe) {
+ }
+ } else
+ metadata.put(metaname, metavalue);
+ }
+ }
+ try {
+ url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
+ url = filters.filter(url); // filter the url
+ } catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e);
+ url = null;
+ }
+ if (url == null)
+ return;
+
+ String reversedUrl = TableUtil.reverseUrl(url);
+ WebPage row = new WebPage();
+ row.setFetchTime(curTime);
+ row.setFetchInterval(customInterval);
+ if (customScore != -1)
+ row.setScore(customScore);
+ else {
+ row.setScore(scoreInjected);
+ try {
+ scfilters.injectedScore(url, row);
+ } catch (ScoringFilterException e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Cannot filter injected score for url " + url
+ + ", using default (" + e.getMessage() + ")");
+ }
+ row.setScore(scoreInjected);
+ }
+ }
+ // now add the metadata
+ Iterator<String> keysIter = metadata.keySet().iterator();
+ while (keysIter.hasNext()) {
+ String keymd = keysIter.next();
+ String valuemd = metadata.get(keymd);
+ row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
+ }
+ Mark.INJECT_MARK.putMark(row, YES_STRING);
+ context.write(reversedUrl, row);
+ }
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ schedule = FetchScheduleFactory.getFetchSchedule(conf);
+ // scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+ }
+
+ @Override
+ protected void map(String key, WebPage row, Context context)
+ throws IOException, InterruptedException {
+ if (Mark.INJECT_MARK.checkMark(row) == null) {
+ return;
+ }
+ Mark.INJECT_MARK.removeMark(row);
+ if (!row.isReadable(WebPage.Field.STATUS.getIndex())) {
+ row.setStatus(CrawlStatus.STATUS_UNFETCHED);
+ schedule.initializeSchedule(key, row);
+ // row.setScore(scoreInjected);
+ }
+
+ context.write(key, row);
+ }
+
+ void inject(Path urlDir) throws Exception {
+ LOG.info("InjectorJob: starting");
+ LOG.info("InjectorJob: urlDir: " + urlDir);
+
+ getConf().setLong("injector.current.time", System.currentTimeMillis());
+ Job job = new NutchJob(getConf(), "inject-p1 " + urlDir);
+ FileInputFormat.addInputPath(job, urlDir);
+ job.setMapperClass(UrlMapper.class);
+ job.setMapOutputKeyClass(String.class);
+ job.setMapOutputValueClass(WebPage.class);
+ job.setOutputFormatClass(GoraOutputFormat.class);
+ GoraOutputFormat.setOutput(job, String.class,
+ WebPage.class, StorageUtils.getDataStoreClass(getConf()), true);
+ job.setReducerClass(Reducer.class);
+ job.setNumReduceTasks(0);
+ job.waitForCompletion(true);
+
+ job = new NutchJob(getConf(), "inject-p2 " + urlDir);
+ StorageUtils.initMapperJob(job, FIELDS, String.class, WebPage.class,
+ InjectorJob.class);
+ job.setNumReduceTasks(0);
+ job.waitForCompletion(true);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("Usage: InjectorJob <url_dir>");
+ return -1;
+ }
+ try {
+ inject(new Path(args[0]));
+ LOG.info("InjectorJob: finished");
+ return -0;
+ } catch (Exception e) {
+ LOG.fatal("InjectorJob: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(), args);
+ System.exit(res);
+ }
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java Wed Jun 30 10:36:20 2010
@@ -21,34 +21,31 @@ import java.util.Collection;
import java.util.HashSet;
import org.apache.hadoop.io.MD5Hash;
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableColumns;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.storage.WebPage;
/**
* Default implementation of a page signature. It calculates an MD5 hash
* of the raw binary content of a page. In case there is no content, it
* calculates a hash from the page's URL.
- *
+ *
* @author Andrzej Bialecki <ab@getopt.org>
*/
public class MD5Signature extends Signature {
- private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
-
+ private final static Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
static {
- COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+ FIELDS.add(WebPage.Field.CONTENT);
}
-
+
@Override
- public byte[] calculate(WebTableRow row, Parse parse) {
- byte[] data = row.getContent();
+ public byte[] calculate(WebPage page) {
+ byte[] data = page.getContent().array();
return MD5Hash.digest(data).getDigest();
}
@Override
- public Collection<HbaseColumn> getColumns() {
- return COLUMNS;
+ public Collection<WebPage.Field> getFields() {
+ return FIELDS;
}
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java Wed Jun 30 10:36:20 2010
@@ -20,12 +20,12 @@ import org.apache.hadoop.io.Writable;
import org.apache.nutch.util.GenericWritableConfigurable;
public class NutchWritable extends GenericWritableConfigurable {
-
+
private static Class<? extends Writable>[] CLASSES = null;
-
+
static {
CLASSES = new Class[] {
- org.apache.hadoop.io.NullWritable.class,
+ org.apache.hadoop.io.NullWritable.class,
org.apache.hadoop.io.LongWritable.class,
org.apache.hadoop.io.BytesWritable.class,
org.apache.hadoop.io.FloatWritable.class,
@@ -44,17 +44,16 @@ public class NutchWritable extends Gener
org.apache.nutch.parse.ParseStatus.class,
org.apache.nutch.protocol.Content.class,
org.apache.nutch.protocol.ProtocolStatus.class,
- org.apache.nutch.searcher.Hit.class,
- org.apache.nutch.searcher.HitDetails.class,
- org.apache.nutch.searcher.Hits.class,
+// org.apache.nutch.searcher.Hit.class,
+// org.apache.nutch.searcher.HitDetails.class,
+// org.apache.nutch.searcher.Hits.class,
org.apache.nutch.scoring.ScoreDatum.class,
- org.apache.nutch.util.hbase.WebTableRow.class,
- org.apache.hadoop.hbase.client.Result.class
+ org.apache.nutch.util.WebPageWritable.class,
};
}
public NutchWritable() { }
-
+
public NutchWritable(Writable instance) {
set(instance);
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java Wed Jun 30 10:36:20 2010
@@ -19,14 +19,12 @@ package org.apache.nutch.crawl;
import java.util.Collection;
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableRow;
import org.apache.hadoop.conf.Configured;
+import org.apache.nutch.storage.WebPage;
public abstract class Signature extends Configured {
-
- public abstract byte[] calculate(WebTableRow row, Parse parse);
-
- public abstract Collection<HbaseColumn> getColumns();
+
+ public abstract byte[] calculate(WebPage page);
+
+ public abstract Collection<WebPage.Field> getFields();
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java Wed Jun 30 10:36:20 2010
@@ -22,17 +22,15 @@ import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-// Hadoop imports
import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.ObjectCache;
-import org.apache.nutch.util.hbase.HbaseColumn;
/**
* Factory class, which instantiates a Signature implementation according to the
* current Configuration configuration. This newly created instance is cached in the
* Configuration instance, so that it could be later retrieved.
- *
+ *
* @author Andrzej Bialecki <ab@getopt.org>
*/
public class SignatureFactory {
@@ -58,9 +56,9 @@ public class SignatureFactory {
}
return impl;
}
-
- public static Collection<HbaseColumn> getColumns(Configuration conf) {
+
+ public static Collection<WebPage.Field> getFields(Configuration conf) {
Signature impl = getSignature(conf);
- return impl.getColumns();
+ return impl.getFields();
}
}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java Wed Jun 30 10:36:20 2010
@@ -26,11 +26,7 @@ import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.io.MD5Hash;
-
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableColumns;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.storage.WebPage;
/**
* <p>An implementation of a page signature. It calculates an MD5 hash
@@ -54,26 +50,27 @@ import org.apache.nutch.util.hbase.WebTa
* in the order of decreasing frequency.</li>
* </ul>
* This list is then submitted to an MD5 hash calculation.
- *
+ *
* @author Andrzej Bialecki <ab@getopt.org>
*/
public class TextProfileSignature extends Signature {
- private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
-
+ private final static Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
static {
- COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+ FIELDS.add(WebPage.Field.CONTENT);
}
-
+
Signature fallback = new MD5Signature();
- public byte[] calculate(WebTableRow row, Parse parse) {
+ @Override
+ public byte[] calculate(WebPage page) {
int MIN_TOKEN_LEN = getConf().getInt("db.signature.text_profile.min_token_len", 2);
float QUANT_RATE = getConf().getFloat("db.signature.text_profile.quant_rate", 0.01f);
HashMap<String, Token> tokens = new HashMap<String, Token>();
String text = null;
- if (parse != null) text = parse.getText();
- if (text == null || text.length() == 0) return fallback.calculate(row, parse);
+ if (page.getText() != null) text = page.getText().toString();
+ if (text == null || text.length() == 0) return fallback.calculate(page);
StringBuffer curToken = new StringBuffer();
int maxFreq = 0;
for (int i = 0; i < text.length(); i++) {
@@ -138,24 +135,26 @@ public class TextProfileSignature extend
return MD5Hash.digest(newText.toString()).getDigest();
}
- public Collection<HbaseColumn> getColumns() {
- return COLUMNS;
+ @Override
+ public Collection<WebPage.Field> getFields() {
+ return FIELDS;
}
-
+
private static class Token {
public int cnt;
public String val;
-
+
public Token(int cnt, String val) {
this.cnt = cnt;
this.val = val;
}
-
+
+ @Override
public String toString() {
return val + " " + cnt;
}
}
-
+
private static class TokenComparator implements Comparator<Token> {
public int compare(Token t1, Token t2) {
return t2.cnt - t1.cnt;
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Partition urls by host, domain name or IP depending on the value of the
+ * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
+ */
+public class URLPartitioner
+extends Partitioner<SelectorEntry, WebPage>
+implements Configurable {
+ private static final Log LOG = LogFactory.getLog(URLPartitioner.class);
+
+ public static final String PARTITION_MODE_KEY = "partition.url.mode";
+
+ public static final String PARTITION_MODE_HOST = "byHost";
+ public static final String PARTITION_MODE_DOMAIN = "byDomain";
+ public static final String PARTITION_MODE_IP = "byIP";
+
+ private Configuration conf;
+
+ private int seed;
+ private URLNormalizers normalizers;
+ private String mode = PARTITION_MODE_HOST;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ seed = conf.getInt("partition.url.seed", 0);
+ mode = conf.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST);
+ // check that the mode is known
+ if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN)
+ && !mode.equals(PARTITION_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + mode + " - forcing to byHost");
+ mode = PARTITION_MODE_HOST;
+ }
+ normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_PARTITION);
+ }
+
+ public void setup(Configuration conf) {
+
+ }
+
+ @Override
+ public int getPartition(SelectorEntry key, WebPage value, int numReduceTasks) {
+ String urlString = key.url;
+ URL url = null;
+ int hashCode = urlString.hashCode();
+ try {
+ urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION);
+ url = new URL(urlString);
+ hashCode = url.getHost().hashCode();
+ } catch (MalformedURLException e) {
+ LOG.warn("Malformed URL: '" + urlString + "'");
+ }
+
+ if (mode.equals(PARTITION_MODE_DOMAIN) && url != null) hashCode = URLUtil
+ .getDomainName(url).hashCode();
+ else if (mode.equals(PARTITION_MODE_IP)) {
+ try {
+ InetAddress address = InetAddress.getByName(url.getHost());
+ hashCode = address.getHostAddress().hashCode();
+ } catch (UnknownHostException e) {
+ GeneratorJob.LOG.info("Couldn't find IP for host: " + url.getHost());
+ }
+ }
+
+ // make hosts wind up in different partitions on different runs
+ hashCode ^= seed;
+
+ return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
+ }
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,32 @@
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.storage.WebPage;
+
+public class URLWebPage {
+
+ private String url;
+
+ private WebPage datum;
+
+ public URLWebPage(String url, WebPage datum) {
+ this.url = url;
+ this.datum = datum;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public WebPage getDatum() {
+ return datum;
+ }
+
+ public void setDatum(WebPage datum) {
+ this.datum = datum;
+ }
+
+}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,466 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.store.DataStore;
+
+/**
+ * Displays information about the entries of the webtable
+ **/
+
+public class WebTableReader extends Configured implements Tool {
+
+ public static final Log LOG = LogFactory.getLog(WebTableReader.class);
+
+ public static class WebTableStatMapper extends
+ GoraMapper<String, WebPage, Text, LongWritable> {
+ LongWritable COUNT_1 = new LongWritable(1);
+ private boolean sort = false;
+
+ public WebTableStatMapper() {
+ }
+
+ public void setup(Context context) {
+ sort = context.getConfiguration().getBoolean("db.reader.stats.sort",
+ false);
+ }
+
+ public void close() {
+ }
+
+ @Override
+ protected void map(
+ String key,
+ WebPage value,
+ org.apache.hadoop.mapreduce.Mapper<String, WebPage, Text, LongWritable>.Context context)
+ throws IOException, InterruptedException {
+ context.write(new Text("T"), COUNT_1);
+ context.write(new Text("status " + value.getStatus()), COUNT_1);
+ context.write(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
+ context.write(new Text("s"), new LongWritable(
+ (long) (value.getScore() * 1000.0)));
+ if (sort) {
+ URL u = new URL(TableUtil.unreverseUrl(key.toString()));
+ String host = u.getHost();
+ context.write(new Text("status " + value.getStatus() + " " + host),
+ COUNT_1);
+ }
+
+ }
+ }
+
+ public static class WebTableStatCombiner extends
+ Reducer<Text, LongWritable, Text, LongWritable> {
+ LongWritable val = new LongWritable();
+
+ public void setup(Context context) {
+ }
+
+ public void cleanup(Context context) {
+ }
+
+ @Override
+ public void reduce(
+ Text key,
+ Iterable<LongWritable> values,
+ org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
+ throws IOException, InterruptedException {
+ val.set(0L);
+ Iterator<LongWritable> iter = values.iterator();
+ String k = ((Text) key).toString();
+ if (!k.equals("s")) {
+ while (iter.hasNext()) {
+ LongWritable cnt = iter.next();
+ val.set(val.get() + cnt.get());
+ }
+ context.write(key, val);
+ } else {
+ long total = 0;
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ while (iter.hasNext()) {
+ LongWritable cnt = iter.next();
+ if (cnt.get() < min)
+ min = cnt.get();
+ if (cnt.get() > max)
+ max = cnt.get();
+ total += cnt.get();
+ }
+ context.write(new Text("scn"), new LongWritable(min));
+ context.write(new Text("scx"), new LongWritable(max));
+ context.write(new Text("sct"), new LongWritable(total));
+ }
+ }
+
+ }
+
+ public static class WebTableStatReducer extends
+ Reducer<Text, LongWritable, Text, LongWritable> {
+
+ public void cleanup(Context context) {
+ }
+
+ @Override
+ protected void reduce(
+ Text key,
+ Iterable<LongWritable> values,
+ org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
+ throws IOException, InterruptedException {
+ Iterator<LongWritable> iter = values.iterator();
+ String k = ((Text) key).toString();
+ if (k.equals("T")) {
+ // sum all values for this key
+ long sum = 0;
+ while (iter.hasNext()) {
+ sum += iter.next().get();
+ }
+ // output sum
+ context.write(key, new LongWritable(sum));
+ } else if (k.startsWith("status") || k.startsWith("retry")) {
+ LongWritable cnt = new LongWritable();
+ while (iter.hasNext()) {
+ LongWritable val = iter.next();
+ cnt.set(cnt.get() + val.get());
+ }
+ context.write(key, cnt);
+ } else if (k.equals("scx")) {
+ LongWritable cnt = new LongWritable(Long.MIN_VALUE);
+ while (iter.hasNext()) {
+ LongWritable val = iter.next();
+ if (cnt.get() < val.get())
+ cnt.set(val.get());
+ }
+ context.write(key, cnt);
+ } else if (k.equals("scn")) {
+ LongWritable cnt = new LongWritable(Long.MAX_VALUE);
+ while (iter.hasNext()) {
+ LongWritable val = iter.next();
+ if (cnt.get() > val.get())
+ cnt.set(val.get());
+ }
+ context.write(key, cnt);
+ } else if (k.equals("sct")) {
+ LongWritable cnt = new LongWritable();
+ while (iter.hasNext()) {
+ LongWritable val = iter.next();
+ cnt.set(cnt.get() + val.get());
+ }
+ context.write(key, cnt);
+ }
+ }
+
+ }
+
+ public void processStatJob(boolean sort) throws IOException,
+ ClassNotFoundException, InterruptedException {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("WebTable statistics start");
+ }
+
+ Path tmpFolder = new Path(getConf().get("mapred.temp.dir", ".")
+ + "stat_tmp" + System.currentTimeMillis());
+
+ Job job = new NutchJob(getConf(), "db_stats");
+
+ job.getConfiguration().setBoolean("db.reader.stats.sort", sort);
+
+ DataStore<String, WebPage> store = StorageUtils.createDataStore(job
+ .getConfiguration(), String.class, WebPage.class);
+ Query<String, WebPage> query = store.newQuery();
+ query.setFields(WebPage._ALL_FIELDS);
+
+ GoraMapper.initMapperJob(job, query, store, Text.class, LongWritable.class,
+ WebTableStatMapper.class, null, true);
+
+ job.setCombinerClass(WebTableStatCombiner.class);
+ job.setReducerClass(WebTableStatReducer.class);
+
+ FileOutputFormat.setOutputPath(job, tmpFolder);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ boolean success = job.waitForCompletion(true);
+
+ FileSystem fileSystem = FileSystem.get(getConf());
+
+ if (!success) {
+ fileSystem.delete(tmpFolder, true);
+ return;
+ }
+
+ Text key = new Text();
+ LongWritable value = new LongWritable();
+
+ SequenceFile.Reader[] readers = org.apache.hadoop.mapred.SequenceFileOutputFormat
+ .getReaders(getConf(), tmpFolder);
+
+ TreeMap<String, LongWritable> stats = new TreeMap<String, LongWritable>();
+ for (int i = 0; i < readers.length; i++) {
+ SequenceFile.Reader reader = readers[i];
+ while (reader.next(key, value)) {
+ String k = key.toString();
+ LongWritable val = stats.get(k);
+ if (val == null) {
+ val = new LongWritable();
+ if (k.equals("scx"))
+ val.set(Long.MIN_VALUE);
+ if (k.equals("scn"))
+ val.set(Long.MAX_VALUE);
+ stats.put(k, val);
+ }
+ if (k.equals("scx")) {
+ if (val.get() < value.get())
+ val.set(value.get());
+ } else if (k.equals("scn")) {
+ if (val.get() > value.get())
+ val.set(value.get());
+ } else {
+ val.set(val.get() + value.get());
+ }
+ }
+ reader.close();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Statistics for WebTable: ");
+ LongWritable totalCnt = stats.get("T");
+ stats.remove("T");
+ LOG.info("TOTAL urls:\t" + totalCnt.get());
+ for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
+ String k = entry.getKey();
+ LongWritable val = entry.getValue();
+ if (k.equals("scn")) {
+ LOG.info("min score:\t" + (float) (val.get() / 1000.0f));
+ } else if (k.equals("scx")) {
+ LOG.info("max score:\t" + (float) (val.get() / 1000.0f));
+ } else if (k.equals("sct")) {
+ LOG.info("avg score:\t"
+ + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
+ } else if (k.startsWith("status")) {
+ String[] st = k.split(" ");
+ int code = Integer.parseInt(st[1]);
+ if (st.length > 2)
+ LOG.info(" " + st[2] + " :\t" + val);
+ else
+ LOG.info(st[0] + " " + code + " ("
+ + CrawlDatum.getStatusName((byte) code) + "):\t" + val);
+ } else
+ LOG.info(k + ":\t" + val);
+ }
+ }
+ // removing the tmp folder
+ fileSystem.delete(tmpFolder, true);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("WebTable statistics: done");
+ }
+
+ }
+
+ /** Prints out the entry to the standard out **/
+ private void read(String key) throws ClassNotFoundException, IOException {
+ DataStore<String, WebPage> datastore = StorageUtils.createDataStore(getConf(),
+ String.class, WebPage.class);
+
+ Query<String, WebPage> query = datastore.newQuery();
+ String reversedUrl = TableUtil.reverseUrl(key);
+ query.setKey(reversedUrl);
+
+ Result<String, WebPage> result = datastore.execute(query);
+ boolean found = false;
+ // should happen only once
+ while (result.next()) {
+ WebPage page = result.get();
+ String skey = result.getKey();
+ // we should not get to this point but nevermind
+ if (page == null || skey == null)
+ break;
+ found = true;
+ String url = TableUtil.unreverseUrl(skey);
+ System.out.println(getPageRepresentation(url, page));
+ }
+ if (!found)
+ System.out.println(key + " not found");
+ result.close();
+ datastore.close();
+ }
+
+ /** Filters the entries from the table based on a regex **/
+ public static class WebTableRegexMapper extends
+ GoraMapper<String, WebPage, Text, Text> {
+
+ static final String regexParamName = "webtable.url.regex";
+
+ public WebTableRegexMapper() {
+ }
+
+ private Pattern regex = null;
+
+ @Override
+ protected void map(
+ String key,
+ WebPage value,
+ org.apache.hadoop.mapreduce.Mapper<String, WebPage, Text, Text>.Context context)
+ throws IOException, InterruptedException {
+ // checks whether the Key passes the regex
+ String url = TableUtil.unreverseUrl(key.toString());
+ if (regex.matcher(url).matches()) {
+ context.write(new Text(url),
+ new Text(getPageRepresentation(key, value)));
+ }
+ }
+
+ @Override
+ protected void setup(
+ org.apache.hadoop.mapreduce.Mapper<String, WebPage, Text, Text>.Context context)
+ throws IOException, InterruptedException {
+ regex = Pattern.compile(context.getConfiguration().get(regexParamName,
+ ".+"));
+ }
+
+ }
+
+ public void processDumpJob(String output, Configuration config, String regex)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("WebTable dump: starting");
+ }
+
+ Path outFolder = new Path(output);
+ Job job = new NutchJob(getConf(), "db_dump");
+
+ job.getConfiguration().set(WebTableRegexMapper.regexParamName, regex);
+
+ DataStore<String, WebPage> store = StorageUtils.createDataStore(job
+ .getConfiguration(), String.class, WebPage.class);
+ Query<String, WebPage> query = store.newQuery();
+ query.setFields(WebPage._ALL_FIELDS);
+
+ GoraMapper.initMapperJob(job, query, store, Text.class, Text.class,
+ WebTableRegexMapper.class, null, true);
+
+ FileOutputFormat.setOutputPath(job, outFolder);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ boolean success = job.waitForCompletion(true);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("WebTable dump: done");
+ }
+ }
+
+ private static String getPageRepresentation(String key, WebPage page) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(key).append("\n");
+ sb.append("baseUrl:\t" + page.getBaseUrl()).append("\n");
+ sb.append("status:\t").append(page.getStatus()).append(" (").append(
+ CrawlStatus.getName((byte) page.getStatus())).append(")\n");
+ sb.append("parse_status:\t" + page.getParseStatus()).append("\n");
+ sb.append("title:\t" + page.getTitle()).append("\n");
+ sb.append("score:\t" + page.getScore()).append("\n");
+
+ Map<Utf8, ByteBuffer> metadata = page.getMetadata();
+ if (metadata != null) {
+ Iterator<Entry<Utf8, ByteBuffer>> iterator = metadata.entrySet()
+ .iterator();
+ while (iterator.hasNext()) {
+ Entry<Utf8, ByteBuffer> entry = iterator.next();
+ sb.append("metadata " + entry.getKey().toString()).append(" : \t")
+ .append(Bytes.toString(entry.getValue().array())).append("\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new WebTableReader(),
+ args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err
+ .println("Usage: WebTableReader (-stats | -url [url] | -dump <out_dir> [-regex regex])");
+ System.err
+ .println("\t-stats [-sort] \tprint overall statistics to System.out");
+ System.err.println("\t\t[-sort]\tlist status sorted by host");
+ System.err
+ .println("\t-url <url>\tprint information on <url> to System.out");
+ System.err
+ .println("\t-dump <out_dir> [-regex regex]\tdump the webtable to a text file in <out_dir>");
+ System.err
+ .println("\t\t[-regex]\tfilter on the URL of the webtable entry");
+ return -1;
+ }
+ String param = null;
+ try {
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-url")) {
+ param = args[++i];
+ read(param);
+ return 0;
+ } else if (args[i].equals("-stats")) {
+ boolean toSort = false;
+ if (i < args.length - 1 && "-sort".equals(args[i + 1])) {
+ toSort = true;
+ i++;
+ }
+ processStatJob(toSort);
+ } else if (args[i].equals("-dump")) {
+ param = args[++i];
+ String regex = ".+";
+ if (i < args.length - 1 && "-regex".equals(args[i + 1]))
+ regex = args[i = i + 2];
+ processDumpJob(param, getConf(), regex);
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("WebTableReader: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ return 0;
+ }
+
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java Wed Jun 30 10:36:20 2010
@@ -4,46 +4,45 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.nutch.storage.WebPage;
+import org.gora.util.IOUtils;
-public class FetchEntry implements Writable {
+public class FetchEntry extends Configured implements Writable {
+
+ private String key;
+ private WebPage page;
- private ImmutableBytesWritable key;
- private Result row;
-
public FetchEntry() {
- key = new ImmutableBytesWritable();
- row = new Result();
- }
-
- public FetchEntry(FetchEntry fe) {
- this.key = new ImmutableBytesWritable(fe.key.get().clone());
+ super(null);
}
-
- public FetchEntry(ImmutableBytesWritable key, Result row) {
+
+ public FetchEntry(Configuration conf, String key, WebPage page) {
+ super(conf);
this.key = key;
- this.row = row;
+ this.page = page;
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
- key.readFields(in);
- row.readFields(in);
+ key = Text.readString(in);
+ page = IOUtils.deserialize(getConf(), in, null, WebPage.class);
}
@Override
public void write(DataOutput out) throws IOException {
- key.write(out);
- row.write(out);
+ Text.writeString(out, key);
+ IOUtils.serialize(getConf(), out, page, WebPage.class);
}
- public ImmutableBytesWritable getKey() {
+ public String getKey() {
return key;
}
- public Result getRow() {
- return row;
+ public WebPage getWebPage() {
+ return page;
}
}
Added: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,245 @@
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.GeneratorJob;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParserJob;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+
+/**
+ * Multi-threaded fetcher.
+ *
+ */
+public class FetcherJob implements Tool {
+
+ public static final String PROTOCOL_REDIR = "protocol";
+
+ public static final int PERM_REFRESH_TIME = 5;
+
+ public static final Utf8 REDIRECT_DISCOVERED = new Utf8("___rdrdsc__");
+
+ private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+ static {
+ FIELDS.add(WebPage.Field.MARKERS);
+ FIELDS.add(WebPage.Field.REPR_URL);
+ }
+
+ /**
+ * <p>
+ * Mapper class for Fetcher.
+ * </p>
+ * <p>
+ * This class reads the random integer written by {@link GeneratorJob} as its key
+ * while outputting the actual key and value arguments through a
+ * {@link FetchEntry} instance.
+ * </p>
+ * <p>
+ * This approach (combined with the use of {@link PartitionUrlByHost}) makes
+ * sure that Fetcher is still polite while also randomizing the key order. If
+ * one host has a huge number of URLs in your table while other hosts have
+ * not, {@link FetcherReducer} will not be stuck on one host but process URLs
+ * from other hosts as well.
+ * </p>
+ */
+ public static class FetcherMapper
+ extends GoraMapper<String, WebPage, IntWritable, FetchEntry> {
+
+ private boolean shouldContinue;
+
+ private Utf8 crawlId;
+
+ private Random random = new Random();
+
+ @Override
+ protected void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ shouldContinue = conf.getBoolean("job.continue", false);
+ crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, Nutch.ALL_CRAWL_ID_STR));
+ }
+
+ @Override
+ protected void map(String key, WebPage page, Context context)
+ throws IOException, InterruptedException {
+ Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
+ if (!NutchJob.shouldProcess(mark, crawlId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different crawl id");
+ }
+ return;
+ }
+ if (shouldContinue && Mark.FETCH_MARK.checkMark(page) != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already fetched");
+ }
+ return;
+ }
+ context.write(new IntWritable(random.nextInt(65536)), new FetchEntry(context
+ .getConfiguration(), key, page));
+ }
+ }
+
+ public static final Log LOG = LogFactory.getLog(FetcherJob.class);
+
+ private Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Collection<WebPage.Field> getFields(Job job) {
+ Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
+ if (job.getConfiguration().getBoolean("fetcher.parse", true)) {
+ ParserJob parserJob = new ParserJob();
+ fields.addAll(parserJob.getFields(job));
+ }
+ ProtocolFactory protocolFactory = new ProtocolFactory(job.getConfiguration());
+ fields.addAll(protocolFactory.getFields());
+
+ return fields;
+ }
+
+ private void fetch(int threads, String crawlId, boolean shouldContinue, boolean isParsing)
+ throws Exception {
+ LOG.info("FetcherJob: starting");
+
+ checkConfiguration();
+
+ if (threads > 0) {
+ getConf().setInt("fetcher.threads.fetch", threads);
+ }
+ getConf().set(GeneratorJob.CRAWL_ID, crawlId);
+ getConf().setBoolean("fetcher.parse", isParsing);
+ getConf().setBoolean("job.continue", shouldContinue);
+
+ // set the actual time for the timelimit relative
+ // to the beginning of the whole job and not of a specific task
+ // otherwise it keeps trying again if a task fails
+ long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+ if (timelimit != -1) {
+ timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+ getConf().setLong("fetcher.timelimit", timelimit);
+ }
+
+ LOG.info("FetcherJob : timelimit set for : " + timelimit);
+ LOG.info("FetcherJob: threads: " + getConf().getInt("fetcher.threads.fetch", 10));
+ LOG.info("FetcherJob: parsing: " + getConf().getBoolean("fetcher.parse", true));
+ LOG.info("FetcherJob: continuing: " + getConf().getBoolean("job.continue", false));
+ if (crawlId.equals(Nutch.ALL_CRAWL_ID_STR)) {
+ LOG.info("FetcherJob: fetching all");
+ } else {
+ LOG.info("FetcherJob: crawlId: " + crawlId);
+ }
+
+ Job job = new NutchJob(getConf(), "fetch");
+
+ Collection<WebPage.Field> fields = getFields(job);
+ StorageUtils.initMapperJob(job, fields, IntWritable.class,
+ FetchEntry.class, FetcherMapper.class, PartitionUrlByHost.class, false);
+ StorageUtils.initReducerJob(job, FetcherReducer.class);
+
+ job.waitForCompletion(true);
+
+ LOG.info("FetcherJob: done");
+ }
+
+ private void checkConfiguration() {
+
+ // ensure that a value has been set for the agent name and that that
+ // agent name is the first value in the agents we advertise for robot
+ // rules parsing
+ String agentName = getConf().get("http.agent.name");
+ if (agentName == null || agentName.trim().length() == 0) {
+ String message = "Fetcher: No agents listed in 'http.agent.name'"
+ + " property.";
+ if (LOG.isFatalEnabled()) {
+ LOG.fatal(message);
+ }
+ throw new IllegalArgumentException(message);
+ } else {
+
+ // get all of the agents that we advertise
+ String agentNames = getConf().get("http.robots.agents");
+ StringTokenizer tok = new StringTokenizer(agentNames, ",");
+ ArrayList<String> agents = new ArrayList<String>();
+ while (tok.hasMoreTokens()) {
+ agents.add(tok.nextToken().trim());
+ }
+
+ // if the first one is not equal to our agent name, log fatal and throw
+ // an exception
+ if (!(agents.get(0)).equalsIgnoreCase(agentName)) {
+ String message = "Fetcher: Your 'http.agent.name' value should be "
+ + "listed first in 'http.robots.agents' property.";
+ LOG.warn(message);
+ }
+ }
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ int threads = -1;
+ boolean shouldContinue = false;
+ boolean isParsing = true;
+ String crawlId;
+
+ String usage = "Usage: FetcherJob (<crawl id> | -all) [-threads N] [-noParsing] [-continue]";
+
+ if (args.length == 0) {
+ System.err.println(usage);
+ return 1;
+ }
+
+ crawlId = args[0];
+ if (crawlId.equals("-threads") || crawlId.equals("-continue") || crawlId.equals("-noParsing")) {
+ System.err.println(usage);
+ return 1;
+ }
+ for (int i = 1; i < args.length; i++) {
+ if ("-threads".equals(args[i])) {
+ // found -threads option
+ threads = Integer.parseInt(args[++i]);
+ } else if ("-continue".equals(args[i])) {
+ shouldContinue = true;
+ } else if ("-noParsing".equals(args[i])) {
+ isParsing = false;
+ }
+ }
+
+ fetch(threads, crawlId, shouldContinue, isParsing); // run the Fetcher
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new FetcherJob(), args);
+ System.exit(res);
+ }
+}
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java Wed Jun 30 10:36:20 2010
@@ -4,6 +4,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -16,34 +17,35 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.avro.util.Utf8;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.nutch.crawl.CrawlDatumHbase;
-import org.apache.nutch.fetcher.Fetcher;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.nutch.crawl.CrawlStatus;
+import org.apache.nutch.crawl.URLWebPage;
import org.apache.nutch.net.URLFilterException;
import org.apache.nutch.net.URLFilters;
import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.ParseUtil;
import org.apache.nutch.protocol.Content;
import org.apache.nutch.protocol.Protocol;
import org.apache.nutch.protocol.ProtocolFactory;
import org.apache.nutch.protocol.ProtocolOutput;
-import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.ProtocolStatusCodes;
import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.ProtocolStatus;
+import org.apache.nutch.storage.ProtocolStatusUtils;
+import org.apache.nutch.storage.WebPage;
import org.apache.nutch.util.LogUtil;
+import org.apache.nutch.util.TableUtil;
import org.apache.nutch.util.URLUtil;
-import org.apache.nutch.util.hbase.WebTableColumns;
-import org.apache.nutch.util.hbase.TableUtil;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.gora.mapreduce.GoraReducer;
public class FetcherReducer
-extends TableReducer<ImmutableBytesWritable, FetchEntry, ImmutableBytesWritable> {
+extends GoraReducer<IntWritable, FetchEntry, String, WebPage> {
- public static final Log LOG = Fetcher.LOG;
+ public static final Log LOG = FetcherJob.LOG;
private final AtomicInteger activeThreads = new AtomicInteger(0);
private final AtomicInteger spinWaiting = new AtomicInteger(0);
@@ -57,34 +59,35 @@ extends TableReducer<ImmutableBytesWrita
private QueueFeeder feeder;
- private List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
+ private final List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
private FetchItemQueues fetchQueues;
-
- private static final ImmutableBytesWritable REDUCE_KEY =
- new ImmutableBytesWritable(TableUtil.YES_VAL);
+
+ private boolean isParsing;
+
+ private ParseUtil parseUtil;
/**
* This class described the item to be fetched.
*/
private static class FetchItem {
- WebTableRow row;
+ WebPage page;
String queueID;
String url;
URL u;
- public FetchItem(String url, WebTableRow row, URL u, String queueID) {
- this.row = row;
+ public FetchItem(String url, WebPage page, URL u, String queueID) {
+ this.page = page;
this.url = url;
this.u = u;
this.queueID = queueID;
}
- /** Create an item. Queue id will be created based on <code>byIP</code>
- * argument, either as a protocol + hostname pair, or protocol + IP
- * address pair.
+ /** Create an item. Queue id will be created based on <code>queueMode</code>
+ * argument, either as a protocol + hostname pair, protocol + IP
+ * address pair or protocol+domain pair.
*/
- public static FetchItem create(String url, WebTableRow row, boolean byIP) {
+ public static FetchItem create(String url, WebPage page, String queueMode) {
String queueID;
URL u = null;
try {
@@ -95,7 +98,7 @@ extends TableReducer<ImmutableBytesWrita
}
final String proto = u.getProtocol().toLowerCase();
String host;
- if (byIP) {
+ if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
try {
final InetAddress addr = InetAddress.getByName(u.getHost());
host = addr.getHostAddress();
@@ -104,16 +107,23 @@ extends TableReducer<ImmutableBytesWrita
LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
return null;
}
- } else {
+ }
+ else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){
+ host = URLUtil.getDomainName(u);
+ if (host == null) {
+ LOG.warn("Unknown domain for url: " + url + ", using URL string as key");
+ host=u.toExternalForm();
+ }
+ }
+ else {
host = u.getHost();
if (host == null) {
- LOG.warn("Unknown host for url: " + url + ", skipping.");
- return null;
+ LOG.warn("Unknown host for url: " + url + ", using URL string as key");
+ host=u.toExternalForm();
}
- host = host.toLowerCase();
}
- queueID = proto + "://" + host;
- return new FetchItem(url, row, u, queueID);
+ queueID = proto + "://" + host.toLowerCase();
+ return new FetchItem(url, page, u, queueID);
}
}
@@ -203,6 +213,12 @@ extends TableReducer<ImmutableBytesWrita
else
nextFetchTime.set(endTime);
}
+
+ public synchronized int emptyQueue() {
+ int presize = queue.size();
+ queue.clear();
+ return presize;
+ }
}
/**
@@ -215,18 +231,30 @@ extends TableReducer<ImmutableBytesWrita
Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
AtomicInteger totalSize = new AtomicInteger(0);
int maxThreads;
- boolean byIP;
+ String queueMode;
long crawlDelay;
long minCrawlDelay;
Configuration conf;
+ long timelimit = -1;
+
+ public static final String QUEUE_MODE_HOST = "byHost";
+ public static final String QUEUE_MODE_DOMAIN = "byDomain";
+ public static final String QUEUE_MODE_IP = "byIP";
public FetchItemQueues(Configuration conf) {
this.conf = conf;
- this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
- // backward-compatible default setting
- this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
+ this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
+ queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
+ // check that the mode is known
+ if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN)
+ && !queueMode.equals(QUEUE_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
+ queueMode = QUEUE_MODE_HOST;
+ }
+ LOG.info("Using queue mode : "+queueMode);
this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+ this.timelimit = conf.getLong("fetcher.timelimit", -1);
}
public int getTotalSize() {
@@ -237,8 +265,8 @@ extends TableReducer<ImmutableBytesWrita
return queues.size();
}
- public void addFetchItem(String url, WebTableRow row) {
- final FetchItem it = FetchItem.create(url, row, byIP);
+ public void addFetchItem(String url, WebPage page) {
+ final FetchItem it = FetchItem.create(url, page, queueMode);
if (it != null) addFetchItem(it);
}
@@ -290,6 +318,29 @@ extends TableReducer<ImmutableBytesWrita
}
return null;
}
+
+ public synchronized int checkTimelimit() {
+ int count = 0;
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // emptying the queues
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0) continue;
+ LOG.info("* queue: " + id + " >> timelimit! ");
+ int deleted = fiq.emptyQueue();
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ count += deleted;
+ }
+ // there might also be a case where totalsize !=0 but number of queues
+ // == 0
+ // in which case we simply force it to 0 to avoid blocking
+ if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+ }
+ return count;
+ }
+
public synchronized void dump() {
for (final String id : queues.keySet()) {
@@ -315,7 +366,7 @@ extends TableReducer<ImmutableBytesWrita
private String reprUrl;
private boolean redirecting;
private int redirectCount;
- private Context context;
+ private final Context context;
public FetcherThread(Context context, int num) {
this.setDaemon(true); // don't hang JVM on exit
@@ -358,10 +409,10 @@ extends TableReducer<ImmutableBytesWrita
}
}
lastRequestStart.set(System.currentTimeMillis());
- if (!fit.row.hasColumn(WebTableColumns.REPR_URL, null)) {
- reprUrl = fit.url.toString();
+ if (!fit.page.isReadable(WebPage.Field.REPR_URL.getIndex())) {
+ reprUrl = fit.url;
} else {
- reprUrl = fit.row.getReprUrl();
+ reprUrl = TableUtil.toString(fit.page.getReprUrl());
}
try {
LOG.info("fetching " + fit.url);
@@ -375,15 +426,15 @@ extends TableReducer<ImmutableBytesWrita
}
redirecting = false;
final Protocol protocol = this.protocolFactory.getProtocol(fit.url);
- final RobotRules rules = protocol.getRobotRules(fit.url, fit.row);
+ final RobotRules rules = protocol.getRobotRules(fit.url, fit.page);
if (!rules.isAllowed(fit.u)) {
// unblock
fetchQueues.finishFetchItem(fit, true);
if (LOG.isDebugEnabled()) {
LOG.debug("Denied by robots.txt: " + fit.url);
}
- output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED,
- CrawlDatumHbase.STATUS_GONE);
+ output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED,
+ CrawlStatus.STATUS_GONE);
continue;
}
if (rules.getCrawlDelay() > 0) {
@@ -391,79 +442,83 @@ extends TableReducer<ImmutableBytesWrita
// unblock
fetchQueues.finishFetchItem(fit, true);
LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
- output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatumHbase.STATUS_GONE);
+ output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, CrawlStatus.STATUS_GONE);
continue;
} else {
final FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
fiq.crawlDelay = rules.getCrawlDelay();
}
}
- final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.row);
+ final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.page);
final ProtocolStatus status = output.getStatus();
final Content content = output.getContent();
// unblock queue
fetchQueues.finishFetchItem(fit);
+ context.getCounter("FetcherStatus", "code_"+status.getCode()).increment(1);
+
+ int length = 0;
+ if (content!=null && content.getContent()!=null) length= content.getContent().length;
+ updateStatus(length);
+
switch(status.getCode()) {
- case ProtocolStatus.WOULDBLOCK:
+ case ProtocolStatusCodes.WOULDBLOCK:
// retry ?
fetchQueues.addFetchItem(fit);
break;
- case ProtocolStatus.SUCCESS: // got a page
- output(fit, content, status, CrawlDatumHbase.STATUS_FETCHED);
- updateStatus(content.getContent().length);
+ case ProtocolStatusCodes.SUCCESS: // got a page
+ output(fit, content, status, CrawlStatus.STATUS_FETCHED);
break;
- case ProtocolStatus.MOVED: // redirect
- case ProtocolStatus.TEMP_MOVED:
+ case ProtocolStatusCodes.MOVED: // redirect
+ case ProtocolStatusCodes.TEMP_MOVED:
byte code;
boolean temp;
- if (status.getCode() == ProtocolStatus.MOVED) {
- code = CrawlDatumHbase.STATUS_REDIR_PERM;
+ if (status.getCode() == ProtocolStatusCodes.MOVED) {
+ code = CrawlStatus.STATUS_REDIR_PERM;
temp = false;
} else {
- code = CrawlDatumHbase.STATUS_REDIR_TEMP;
+ code = CrawlStatus.STATUS_REDIR_TEMP;
temp = true;
}
output(fit, content, status, code);
- final String newUrl = status.getMessage();
- handleRedirect(fit.url, newUrl, temp, Fetcher.PROTOCOL_REDIR);
+ final String newUrl = ProtocolStatusUtils.getMessage(status);
+ handleRedirect(fit.url, newUrl, temp, FetcherJob.PROTOCOL_REDIR);
redirecting = false;
break;
- case ProtocolStatus.EXCEPTION:
- logError(fit.url, status.getMessage());
+ case ProtocolStatusCodes.EXCEPTION:
+ logError(fit.url, ProtocolStatusUtils.getMessage(status));
/* FALLTHROUGH */
- case ProtocolStatus.RETRY: // retry
- case ProtocolStatus.BLOCKED:
- output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+ case ProtocolStatusCodes.RETRY: // retry
+ case ProtocolStatusCodes.BLOCKED:
+ output(fit, null, status, CrawlStatus.STATUS_RETRY);
break;
- case ProtocolStatus.GONE: // gone
- case ProtocolStatus.NOTFOUND:
- case ProtocolStatus.ACCESS_DENIED:
- case ProtocolStatus.ROBOTS_DENIED:
- output(fit, null, status, CrawlDatumHbase.STATUS_GONE);
+ case ProtocolStatusCodes.GONE: // gone
+ case ProtocolStatusCodes.NOTFOUND:
+ case ProtocolStatusCodes.ACCESS_DENIED:
+ case ProtocolStatusCodes.ROBOTS_DENIED:
+ output(fit, null, status, CrawlStatus.STATUS_GONE);
break;
- case ProtocolStatus.NOTMODIFIED:
- output(fit, null, status, CrawlDatumHbase.STATUS_NOTMODIFIED);
+ case ProtocolStatusCodes.NOTMODIFIED:
+ output(fit, null, status, CrawlStatus.STATUS_NOTMODIFIED);
break;
default:
if (LOG.isWarnEnabled()) {
LOG.warn("Unknown ProtocolStatus: " + status.getCode());
}
- output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+ output(fit, null, status, CrawlStatus.STATUS_RETRY);
}
if (redirecting && redirectCount >= maxRedirect) {
fetchQueues.finishFetchItem(fit);
- if (LOG.isInfoEnabled()) {
- LOG.info(" - redirect count exceeded " + fit.url);
- }
- output(fit, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatumHbase.STATUS_GONE);
+ LOG.info(" - redirect count exceeded " + fit.url);
+ output(fit, null, ProtocolStatusUtils.STATUS_REDIR_EXCEEDED,
+ CrawlStatus.STATUS_GONE);
}
} while (redirecting && (redirectCount < maxRedirect));
@@ -471,17 +526,16 @@ extends TableReducer<ImmutableBytesWrita
} catch (final Throwable t) { // unexpected exception
// unblock
fetchQueues.finishFetchItem(fit);
- t.printStackTrace();
logError(fit.url, t.toString());
- output(fit, null, ProtocolStatus.STATUS_FAILED, CrawlDatumHbase.STATUS_RETRY);
+ t.printStackTrace(LogUtil.getDebugStream(LOG));
+ output(fit, null, ProtocolStatusUtils.STATUS_FAILED,
+ CrawlStatus.STATUS_RETRY);
}
}
} catch (final Throwable e) {
- if (LOG.isFatalEnabled()) {
- e.printStackTrace(LogUtil.getFatalStream(LOG));
- LOG.fatal("fetcher caught:"+e.toString());
- }
+ LOG.fatal("fetcher caught:"+e.toString());
+ e.printStackTrace(LogUtil.getFatalStream(LOG));
} finally {
if (fit != null) fetchQueues.finishFetchItem(fit);
activeThreads.decrementAndGet(); // count threads
@@ -499,14 +553,12 @@ extends TableReducer<ImmutableBytesWrita
}
reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
final String reversedNewUrl = TableUtil.reverseUrl(newUrl);
- // TODO: Find a way to use MutablWebTableRow here
- Put put = new Put(Bytes.toBytes(reversedNewUrl));
+ WebPage newWebPage = new WebPage();
if (!reprUrl.equals(url)) {
- put.add(WebTableColumns.REPR_URL, null, Bytes.toBytes(reprUrl));
+ newWebPage.setReprUrl(new Utf8(reprUrl));
}
- put.add(WebTableColumns.METADATA,
- Fetcher.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
- context.write(REDUCE_KEY, put);
+ newWebPage.putToMetadata(FetcherJob.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
+ context.write(reversedNewUrl, newWebPage);
if (LOG.isDebugEnabled()) {
LOG.debug(" - " + redirType + " redirect to " +
reprUrl + " (fetching later)");
@@ -522,27 +574,34 @@ extends TableReducer<ImmutableBytesWrita
private void output(FetchItem fit, Content content,
ProtocolStatus pstatus, byte status)
throws IOException, InterruptedException {
- fit.row.setStatus(status);
- final long prevFetchTime = fit.row.getFetchTime();
- fit.row.setPrevFetchTime(prevFetchTime);
- fit.row.setFetchTime(System.currentTimeMillis());
+ fit.page.setStatus(status);
+ final long prevFetchTime = fit.page.getFetchTime();
+ fit.page.setPrevFetchTime(prevFetchTime);
+ fit.page.setFetchTime(System.currentTimeMillis());
if (pstatus != null) {
- fit.row.setProtocolStatus(pstatus);
+ fit.page.setProtocolStatus(pstatus);
}
if (content != null) {
- fit.row.setContent(content.getContent());
- fit.row.setContentType(content.getContentType());
- fit.row.setBaseUrl(content.getBaseUrl());
+ fit.page.setContent(ByteBuffer.wrap(content.getContent()));
+ fit.page.setContentType(new Utf8(content.getContentType()));
+ fit.page.setBaseUrl(new Utf8(content.getBaseUrl()));
+ }
+ Mark.FETCH_MARK.putMark(fit.page, Mark.GENERATE_MARK.checkMark(fit.page));
+ String key = TableUtil.reverseUrl(fit.url);
+
+ if (isParsing) {
+ URLWebPage redirectedPage = parseUtil.process(key, fit.page);
+ if (redirectedPage != null) {
+ context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
+ redirectedPage.getDatum());
+ }
}
- fit.row.putMeta(Fetcher.FETCH_MARK, TableUtil.YES_VAL);
- fit.row.makeRowMutation().writeToContext(REDUCE_KEY, context);
+ context.write(key, fit.page);
}
private void logError(String url, String message) {
- if (LOG.isInfoEnabled()) {
- LOG.info("fetch of " + url + " failed with: " + message);
- }
+ LOG.info("fetch of " + url + " failed with: " + message);
errors.incrementAndGet();
}
}
@@ -557,6 +616,7 @@ extends TableReducer<ImmutableBytesWrita
private final int size;
private Iterator<FetchEntry> currentIter;
boolean hasMore;
+ private long timelimit = -1;
public QueueFeeder(Context context,
FetchItemQueues queues, int size)
@@ -570,13 +630,29 @@ extends TableReducer<ImmutableBytesWrita
if (hasMore) {
currentIter = context.getValues().iterator();
}
+ // the value of the time limit is either -1 or the time where it should finish
+ timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1);
}
@Override
public void run() {
int cnt = 0;
+ int timelimitcount = 0;
try {
while (hasMore) {
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // enough .. lets' simply
+ // read all the entries from the input without processing them
+ while (currentIter.hasNext()) {
+ currentIter.next();
+ timelimitcount++;
+ }
+ hasMore = context.nextKey();
+ if (hasMore) {
+ currentIter = context.getValues().iterator();
+ }
+ continue;
+ }
int feed = size - queues.getTotalSize();
if (feed <= 0) {
// queues are full - spin-wait until they have some free space
@@ -584,19 +660,15 @@ extends TableReducer<ImmutableBytesWrita
Thread.sleep(1000);
} catch (final Exception e) {};
continue;
- }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("-feeding " + feed + " input urls ...");
}
while (feed > 0 && currentIter.hasNext()) {
- FetchEntry entry = new FetchEntry();
- // since currentIter.next() reuses the same
- // FetchEntry object we need to clone it
- Writables.copyWritable(currentIter.next(), entry);
- WebTableRow row = new WebTableRow(entry.getRow());
+ FetchEntry entry = currentIter.next();
final String url =
- TableUtil.unreverseUrl(Bytes.toString(entry.getKey().get()));
- queues.addFetchItem(url, row);
+ TableUtil.unreverseUrl(entry.getKey());
+ queues.addFetchItem(url, entry.getWebPage());
feed--;
cnt++;
}
@@ -612,17 +684,35 @@ extends TableReducer<ImmutableBytesWrita
LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
return;
}
- LOG.info("QueueFeeder finished: total " + cnt + " records.");
+ LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :"
+ + timelimitcount);
+ context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount);
}
}
+ private void reportStatus(Context context) throws IOException {
+ StringBuffer status = new StringBuffer();
+ long elapsed = (System.currentTimeMillis() - start)/1000;
+ status.append(spinWaiting).append("/").append(activeThreads).append(" threads spinwaiting\n");
+ status.append(pages).append(" pages, ").append(errors).append(" errors, ");
+ status.append(Math.round(((float)pages.get()*10)/elapsed)/10.0).append(" pages/s, ");
+ status.append(Math.round(((((float)bytes.get())*8)/1024)/elapsed)).append(" kb/s, ");
+ status.append(this.fetchQueues.getTotalSize()).append(" URLs in ");
+ status.append(this.fetchQueues.getQueueCount()).append(" queues");
+ context.setStatus(status.toString());
+ }
+
@Override
public void run(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
this.fetchQueues = new FetchItemQueues(conf);
- final int threadCount = conf.getInt("fetcher.threads.fetch", 10);
- LOG.info("FetcherHbase: threads: " + threadCount);
+ int threadCount = conf.getInt("fetcher.threads.fetch", 10);
+ isParsing = conf.getBoolean("fetcher.parse", true);
+ if (isParsing) {
+ parseUtil = new ParseUtil(conf);
+ }
+ LOG.info("Fetcher: threads: " + threadCount);
// set non-blocking & no-robots mode for HTTP protocol plugins.
conf.setBoolean(Protocol.CHECK_BLOCKING, false);
@@ -645,12 +735,20 @@ extends TableReducer<ImmutableBytesWrita
} catch (final InterruptedException e) {}
context.progress();
+ reportStatus(context);
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+ ", fetchQueues= " + fetchQueues.getQueueCount() +", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
+
+ // check timelimit
+ if (!feeder.isAlive()) {
+ int hitByTimeLimit = fetchQueues.checkTimelimit();
+ if (hitByTimeLimit != 0) context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
+ }
+
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
LOG.warn("Aborting with " + activeThreads + " hung threads.");
Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java Wed Jun 30 10:36:20 2010
@@ -2,17 +2,17 @@ package org.apache.nutch.fetcher;
import java.net.URL;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.util.TableUtil;
public class PartitionUrlByHost
-extends Partitioner<ImmutableBytesWritable, FetchEntry> {
+extends Partitioner<IntWritable, FetchEntry> {
@Override
- public int getPartition(ImmutableBytesWritable key,
+ public int getPartition(IntWritable key,
FetchEntry value, int numPartitions) {
- String urlString = Bytes.toString(value.getKey().get());
+ String urlString = TableUtil.unreverseUrl(value.getKey());
URL url = null;