You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2007/01/17 22:06:54 UTC
svn commit: r497172 - in /lucene/nutch/trunk: bin/nutch
src/java/org/apache/nutch/fetcher/Fetcher.java
src/java/org/apache/nutch/fetcher/Fetcher2.java
Author: ab
Date: Wed Jan 17 13:06:50 2007
New Revision: 497172
URL: http://svn.apache.org/viewvc?view=rev&rev=497172
Log:
Revert accidental change to bin/nutch.
Fix Fetcher.java to correctly split input.
Add Fetcher2 - a queue-based fetcher implementation.
Added:
lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java (with props)
Modified:
lucene/nutch/trunk/bin/nutch
lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
Modified: lucene/nutch/trunk/bin/nutch
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/bin/nutch?view=diff&rev=497172&r1=497171&r2=497172
==============================================================================
--- lucene/nutch/trunk/bin/nutch (original)
+++ lucene/nutch/trunk/bin/nutch Wed Jan 17 13:06:50 2007
@@ -41,6 +41,7 @@
echo " generate generate new segments to fetch from crawl db"
echo " freegen generate new segments to fetch from text files"
echo " fetch fetch a segment's pages"
+ echo " fetch2 fetch a segment's pages using Fetcher2 implementation"
echo " parse parse a segment's pages"
echo " readseg read / dump segment data"
echo " mergesegs merge several segments, with optional filtering and slicing"
@@ -177,6 +178,8 @@
CLASS=org.apache.nutch.tools.FreeGenerator
elif [ "$COMMAND" = "fetch" ] ; then
CLASS=org.apache.nutch.fetcher.Fetcher
+elif [ "$COMMAND" = "fetch2" ] ; then
+ CLASS=org.apache.nutch.fetcher.Fetcher2
elif [ "$COMMAND" = "parse" ] ; then
CLASS=org.apache.nutch.parse.ParseSegment
elif [ "$COMMAND" = "readdb" ] ; then
@@ -220,6 +223,5 @@
fi
# run it
-echo "$JAVA" $JAVA_HEAP_MAX $NUTCH_OPTS -classpath "$CLASSPATH" $CLASS "$@"
exec "$JAVA" $JAVA_HEAP_MAX $NUTCH_OPTS -classpath "$CLASSPATH" $CLASS "$@"
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?view=diff&rev=497172&r1=497171&r2=497172
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Wed Jan 17 13:06:50 2007
@@ -48,9 +48,10 @@
public static class InputFormat extends SequenceFileInputFormat {
/** Don't split inputs, to keep things polite. */
- public InputSplit[] getSplits(FileSystem fs, JobConf job, int nSplits)
+ public InputSplit[] getSplits(JobConf job, int nSplits)
throws IOException {
Path[] files = listPaths(job);
+ FileSystem fs = FileSystem.get(job);
InputSplit[] splits = new InputSplit[files.length];
for (int i = 0; i < files.length; i++) {
splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]), job);
Added: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java?view=auto&rev=497172
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java Wed Jan 17 13:06:50 2007
@@ -0,0 +1,875 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+// Commons Logging imports
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.*;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.parse.*;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.*;
+
+
+/**
+ * A queue-based fetcher.
+ *
+ * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
+ * and many consumers (FetcherThread-s).
+ *
+ * <p>QueueFeeder reads input fetchlists and
+ * populates a set of FetchItemQueue-s, which hold FetchItem-s that
+ * describe the items to be fetched. There are as many queues as there are unique
+ * hosts, but at any given time the total number of fetch items in all queues
+ * is less than a fixed number (currently set to a multiple of the number of
+ * threads).
+ *
+ * <p>As items are consumed from the queues, the QueueFeeder continues to add new
+ * input items, so that their total count stays fixed (FetcherThread-s may also
+ * add new items to the queues e.g. as a results of redirection) - until all
+ * input items are exhausted, at which point the number of items in the queues
+ * begins to decrease. When this number reaches 0 fetcher will finish.
+ *
+ * <p>This fetcher implementation handles per-host blocking itself, instead
+ * of delegating this work to protocol-specific plugins.
+ * Each per-host queue handles its own "politeness" settings, such as the
+ * maximum number of concurrent requests and crawl delay between consecutive
+ * requests - and also a list of requests in progress, and the time the last
+ * request was finished. As FetcherThread-s ask for new items to be fetched,
+ * queues may return eligible items or null if for "politeness" reasons this
+ * host's queue is not yet ready.
+ *
+ * <p>If there are still unfetched items on the queues, but none of the items
+ * are ready, FetcherThread-s will spin-wait until either some items become
+ * available, or a timeout is reached (at which point the Fetcher will abort,
+ * assuming the task is hung).
+ *
+ * @author Andrzej Bialecki
+ */
+public class Fetcher2 extends Configured implements MapRunnable {
+
+ public static final Log LOG = LogFactory.getLog(Fetcher2.class);
+
+ public static class InputFormat extends SequenceFileInputFormat {
+ /** Don't split inputs, to keep things polite. */
+ public InputSplit[] getSplits(JobConf job, int nSplits)
+ throws IOException {
+ Path[] files = listPaths(job);
+ FileSplit[] splits = new FileSplit[files.length];
+ FileSystem fs = FileSystem.get(job);
+ for (int i = 0; i < files.length; i++) {
+ splits[i] = new FileSplit(files[i], 0, fs.getLength(files[i]), job);
+ }
+ return splits;
+ }
+ }
+
+ private OutputCollector output;
+ private Reporter reporter;
+
+ private String segmentName;
+ private AtomicInteger activeThreads = new AtomicInteger(0);
+ private AtomicInteger spinWaiting = new AtomicInteger(0);
+
+ private long start = System.currentTimeMillis(); // start time of fetcher run
+ private AtomicLong lastRequestStart = new AtomicLong(start);
+
+ private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
+ private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
+ private AtomicInteger errors = new AtomicInteger(0); // total pages errored
+
+ private boolean storingContent;
+ private boolean parsing;
+ FetchItemQueues fetchQueues;
+ QueueFeeder feeder;
+
+ /**
+ * This class described the item to be fetched.
+ */
+ private static class FetchItem {
+ String queueID;
+ Text url;
+ URL u;
+ CrawlDatum datum;
+
+ public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
+ this.url = url;
+ this.u = u;
+ this.datum = datum;
+ this.queueID = queueID;
+ }
+
+ /** Create an item. Queue id will be created based on <code>byIP</code>
+ * argument, either as a protocol + hostname pair, or protocol + IP
+ * address pair.
+ */
+ public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
+ String queueID;
+ URL u = null;
+ try {
+ u = new URL(url.toString());
+ } catch (Exception e) {
+ LOG.warn("Cannot parse url: " + url, e);
+ return null;
+ }
+ String proto = u.getProtocol().toLowerCase();
+ String host;
+ if (byIP) {
+ try {
+ InetAddress addr = InetAddress.getByName(u.getHost());
+ host = addr.getHostAddress();
+ } catch (UnknownHostException e) {
+ // unable to resolve it, so don't fall back to host name
+ LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+ return null;
+ }
+ } else {
+ host = u.getHost();
+ if (host == null) {
+ LOG.warn("Unknown host for url: " + url + ", skipping.");
+ return null;
+ }
+ host = host.toLowerCase();
+ }
+ queueID = proto + "://" + host;
+ return new FetchItem(url, u, datum, queueID);
+ }
+
+ public CrawlDatum getDatum() {
+ return datum;
+ }
+
+ public String getQueueID() {
+ return queueID;
+ }
+
+ public Text getUrl() {
+ return url;
+ }
+
+ public URL getURL2() {
+ return u;
+ }
+ }
+
+ /**
+ * This class handles FetchItems which come from the same host ID (be it
+ * a proto/hostname or proto/IP pair). It also keeps track of requests in
+ * progress and elapsed time between requests.
+ */
+ private static class FetchItemQueue {
+ List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
+ Set<FetchItem> inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
+ AtomicLong endTime = new AtomicLong();
+ long crawlDelay;
+ long minCrawlDelay;
+ int maxThreads;
+ Configuration conf;
+
+ public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
+ this.conf = conf;
+ this.maxThreads = maxThreads;
+ this.crawlDelay = crawlDelay;
+ this.minCrawlDelay = minCrawlDelay;
+ // ready to start
+ this.endTime.set(System.currentTimeMillis() - crawlDelay);
+ }
+
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ public int getInProgressSize() {
+ return inProgress.size();
+ }
+
+ public void finishFetchItem(FetchItem it) {
+ if (it != null) {
+ inProgress.remove(it);
+ endTime.set(System.currentTimeMillis());
+ }
+ }
+
+ public void addFetchItem(FetchItem it) {
+ if (it == null) return;
+ queue.add(it);
+ }
+
+ public void addInProgressFetchItem(FetchItem it) {
+ if (it == null) return;
+ inProgress.add(it);
+ }
+
+ public FetchItem getFetchItem() {
+ if (inProgress.size() >= maxThreads) return null;
+ long now = System.currentTimeMillis();
+ long last = endTime.get() + (maxThreads > 1 ? crawlDelay : minCrawlDelay);
+ if (last > now) return null;
+ FetchItem it = null;
+ if (queue.size() == 0) return null;
+ try {
+ it = queue.remove(0);
+ inProgress.add(it);
+ } catch (Exception e) {
+
+ }
+ return it;
+ }
+
+ public synchronized void dump() {
+ LOG.info(" maxThreads = " + maxThreads);
+ LOG.info(" inProgress = " + inProgress.size());
+ LOG.info(" crawlDelay = " + crawlDelay);
+ LOG.info(" minCrawlDelay = " + minCrawlDelay);
+ LOG.info(" endTime = " + endTime.get());
+ LOG.info(" now = " + System.currentTimeMillis());
+ for (int i = 0; i < queue.size(); i++) {
+ FetchItem it = queue.get(i);
+ LOG.info(" " + i + ". " + it.url);
+ }
+ }
+ }
+
+ /**
+ * Convenience class - a collection of queues that keeps track of the total
+ * number of items, and provides items eligible for fetching from any queue.
+ */
+ private static class FetchItemQueues {
+ public static final String DEFAULT_ID = "default";
+ Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+ AtomicInteger totalSize = new AtomicInteger(0);
+ int maxThreads;
+ boolean byIP;
+ long crawlDelay;
+ long minCrawlDelay;
+ Configuration conf;
+
+ public FetchItemQueues(Configuration conf) {
+ this.conf = conf;
+ this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
+ // backward-compatible default setting
+ this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
+ this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
+ this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+ }
+
+ public int getTotalSize() {
+ return totalSize.get();
+ }
+
+ public int getQueueCount() {
+ return queues.size();
+ }
+
+ public void addFetchItem(Text url, CrawlDatum datum) {
+ FetchItem it = FetchItem.create(url, datum, byIP);
+ if (it != null) addFetchItem(it);
+ }
+
+ public void addFetchItem(FetchItem it) {
+ FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+ fiq.addFetchItem(it);
+ totalSize.incrementAndGet();
+ }
+
+ public void finishFetchItem(FetchItem it) {
+ FetchItemQueue fiq = queues.get(it.queueID);
+ if (fiq == null) {
+ LOG.warn("Attempting to finish item from unknown queue: " + it);
+ return;
+ }
+ fiq.finishFetchItem(it);
+ }
+
+ public synchronized FetchItemQueue getFetchItemQueue(String id) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq == null) {
+ // initialize queue
+ fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+ queues.put(id, fiq);
+ }
+ return fiq;
+ }
+
+ public synchronized FetchItem getFetchItem() {
+ Iterator it = queues.keySet().iterator();
+ while (it.hasNext()) {
+ FetchItemQueue fiq = queues.get(it.next());
+ // reap empty queues
+ if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+ it.remove();
+ continue;
+ }
+ FetchItem fit = fiq.getFetchItem();
+ if (fit != null) {
+ totalSize.decrementAndGet();
+ return fit;
+ }
+ }
+ return null;
+ }
+
+ public synchronized void dump() {
+ Iterator it = queues.keySet().iterator();
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0) continue;
+ LOG.info("* queue: " + id);
+ fiq.dump();
+ }
+ }
+ }
+
+ /**
+ * This class feeds the queues with input items, and re-fills them as
+ * items are consumed by FetcherThread-s.
+ */
+ private static class QueueFeeder extends Thread {
+ private RecordReader reader;
+ private FetchItemQueues queues;
+ private int size;
+
+ public QueueFeeder(RecordReader reader, FetchItemQueues queues, int size) {
+ this.reader = reader;
+ this.queues = queues;
+ this.size = size;
+ this.setDaemon(true);
+ this.setName("QueueFeeder");
+ }
+
+ public void run() {
+ boolean hasMore = true;
+ int cnt = 0;
+
+ while (hasMore) {
+ int feed = size - queues.getTotalSize();
+ if (feed <= 0) {
+ // queues are full - spin-wait until they have some free space
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {};
+ continue;
+ } else {
+ LOG.debug("-feeding " + feed + " input urls ...");
+ while (feed > 0 && hasMore) {
+ try {
+ Text url = new Text();
+ CrawlDatum datum = new CrawlDatum();
+ hasMore = reader.next(url, datum);
+ if (hasMore) {
+ queues.addFetchItem(url, datum);
+ cnt++;
+ feed--;
+ }
+ } catch (IOException e) {
+ LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
+ return;
+ }
+ }
+ }
+ }
+ LOG.info("QueueFeeder finished: total " + cnt + " records.");
+ }
+ }
+
+ /**
+ * This class picks items from queues and fetches the pages.
+ */
+ private class FetcherThread extends Thread {
+ private Configuration conf;
+ private URLFilters urlFilters;
+ private ScoringFilters scfilters;
+ private ParseUtil parseUtil;
+ private URLNormalizers normalizers;
+ private ProtocolFactory protocolFactory;
+ private long maxCrawlDelay;
+ private boolean byIP;
+ private int maxRedirect;
+
+ public FetcherThread(Configuration conf) {
+ this.setDaemon(true); // don't hang JVM on exit
+ this.setName("FetcherThread"); // use an informative name
+ this.conf = conf;
+ this.urlFilters = new URLFilters(conf);
+ this.scfilters = new ScoringFilters(conf);
+ this.parseUtil = new ParseUtil(conf);
+ this.protocolFactory = new ProtocolFactory(conf);
+ this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+ this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+ // backward-compatible default setting
+ this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
+ this.maxRedirect = conf.getInt("http.redirect.max", 3);
+ }
+
+ public void run() {
+ activeThreads.incrementAndGet(); // count threads
+
+ FetchItem fit = null;
+ try {
+
+ while (true) {
+ fit = fetchQueues.getFetchItem();
+ if (fit == null) {
+ if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
+ LOG.debug(getName() + " spin-waiting ...");
+ // spin-wait.
+ spinWaiting.incrementAndGet();
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {}
+ spinWaiting.decrementAndGet();
+ continue;
+ } else {
+ // all done, finish this thread
+ return;
+ }
+ }
+ lastRequestStart.set(System.currentTimeMillis());
+ try {
+ if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }
+
+ // fetch the page
+ boolean redirecting = false;
+ int redirectCount = 0;
+ do {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("redirectCount=" + redirectCount);
+ }
+ redirecting = false;
+ Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
+ RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
+ if (!rules.isAllowed(fit.u)) {
+ // unblock
+ fetchQueues.finishFetchItem(fit);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Denied by robots.txt: " + fit.url);
+ }
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
+ continue;
+ }
+ if (rules.getCrawlDelay() > 0) {
+ if (rules.getCrawlDelay() > maxCrawlDelay) {
+ // unblock
+ fetchQueues.finishFetchItem(fit);
+ LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
+ continue;
+ } else {
+ FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+ fiq.crawlDelay = rules.getCrawlDelay();
+ }
+ }
+ ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
+ ProtocolStatus status = output.getStatus();
+ Content content = output.getContent();
+ ParseStatus pstatus = null;
+ // unblock queue
+ fetchQueues.finishFetchItem(fit);
+
+ switch(status.getCode()) {
+
+ case ProtocolStatus.WOULDBLOCK:
+ // unblock
+ fetchQueues.finishFetchItem(fit);
+ // retry ?
+ fetchQueues.addFetchItem(fit);
+ break;
+
+ case ProtocolStatus.SUCCESS: // got a page
+ pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
+ updateStatus(content.getContent().length);
+ if (pstatus != null && pstatus.isSuccess() &&
+ pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+ String newUrl = pstatus.getMessage();
+ newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+ newUrl = this.urlFilters.filter(newUrl);
+ if (newUrl != null && !newUrl.equals(fit.url.toString())) {
+ output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_REDIR_PERM);
+ Text redirUrl = new Text(newUrl);
+ if (maxRedirect > 0) {
+ redirecting = true;
+ redirectCount++;
+ fit = FetchItem.create(redirUrl, new CrawlDatum(), byIP);
+ FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+ fiq.addInProgressFetchItem(fit);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - content redirect to " + redirUrl + " (fetching now)");
+ }
+ } else {
+ output(redirUrl, new CrawlDatum(), null, null, CrawlDatum.STATUS_LINKED);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - content redirect to " + redirUrl + " (fetching later)");
+ }
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(" - content redirect skipped: " +
+ (newUrl != null ? "to same url" : "filtered"));
+ }
+ }
+ break;
+
+ case ProtocolStatus.MOVED: // redirect
+ case ProtocolStatus.TEMP_MOVED:
+ int code;
+ if (status.getCode() == ProtocolStatus.MOVED) {
+ code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+ } else {
+ code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+ }
+ output(fit.url, fit.datum, content, status, code);
+ String newUrl = status.getMessage();
+ newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+ newUrl = this.urlFilters.filter(newUrl);
+ if (newUrl != null && !newUrl.equals(fit.url.toString())) {
+ Text redirUrl = new Text(newUrl);
+ if (maxRedirect > 0) {
+ redirecting = true;
+ redirectCount++;
+ fit = FetchItem.create(redirUrl, new CrawlDatum(), byIP);
+ FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+ fiq.addInProgressFetchItem(fit);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - protocol redirect to " + redirUrl + " (fetching now)");
+ }
+ } else {
+ output(redirUrl, new CrawlDatum(), null, null, CrawlDatum.STATUS_LINKED);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - protocol redirect to " + redirUrl + " (fetching later)");
+ }
+ }
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug(" - protocol redirect skipped: " +
+ (newUrl != null ? "to same url" : "filtered"));
+ }
+ break;
+
+ case ProtocolStatus.EXCEPTION:
+ logError(fit.url, status.getMessage());
+ /* FALLTHROUGH */
+ case ProtocolStatus.RETRY: // retry
+ fit.datum.setRetriesSinceFetch(fit.datum.getRetriesSinceFetch()+1);
+ /* FALLTHROUGH */
+ // intermittent blocking - retry without increasing the counter
+ case ProtocolStatus.BLOCKED:
+ output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
+ break;
+
+ case ProtocolStatus.GONE: // gone
+ case ProtocolStatus.NOTFOUND:
+ case ProtocolStatus.ACCESS_DENIED:
+ case ProtocolStatus.ROBOTS_DENIED:
+ case ProtocolStatus.NOTMODIFIED:
+ output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+ break;
+
+ default:
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+ }
+ output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+ }
+
+ if (redirecting && redirectCount >= maxRedirect) {
+ fetchQueues.finishFetchItem(fit);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(" - redirect count exceeded " + fit.url);
+ }
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
+ }
+
+ } while (redirecting && (redirectCount < maxRedirect));
+
+ } catch (Throwable t) { // unexpected exception
+ // unblock
+ fetchQueues.finishFetchItem(fit);
+ logError(fit.url, t.toString());
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
+ }
+ }
+
+ } catch (Throwable e) {
+ if (LOG.isFatalEnabled()) {
+ e.printStackTrace(LogUtil.getFatalStream(LOG));
+ LOG.fatal("fetcher caught:"+e.toString());
+ }
+ } finally {
+ if (fit != null) fetchQueues.finishFetchItem(fit);
+ activeThreads.decrementAndGet(); // count threads
+ LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
+ }
+ }
+
+ private void logError(Text url, String message) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetch of " + url + " failed with: " + message);
+ }
+ errors.incrementAndGet();
+ }
+
+ private ParseStatus output(Text key, CrawlDatum datum,
+ Content content, ProtocolStatus pstatus, int status) {
+
+ datum.setStatus(status);
+ datum.setFetchTime(System.currentTimeMillis());
+ if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
+ if (content == null) {
+ String url = key.toString();
+ content = new Content(url, url, new byte[0], "", new Metadata(), this.conf);
+ }
+ Metadata metadata = content.getMetadata();
+ // add segment to metadata
+ metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ // add score to content metadata so that ParseSegment can pick it up.
+ try {
+ scfilters.passScoreBeforeParsing(key, datum, content);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ e.printStackTrace(LogUtil.getWarnStream(LOG));
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+
+ Parse parse = null;
+ if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+ ParseStatus parseStatus;
+ try {
+ parse = this.parseUtil.parse(content);
+ parseStatus = parse.getData().getStatus();
+ } catch (Exception e) {
+ parseStatus = new ParseStatus(e);
+ }
+ if (!parseStatus.isSuccess()) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Error parsing: " + key + ": " + parseStatus);
+ }
+ parse = parseStatus.getEmptyParse(getConf());
+ }
+ // Calculate page signature. For non-parsing fetchers this will
+ // be done in ParseSegment
+ byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse);
+ metadata.set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
+ datum.setSignature(signature);
+ // Ensure segment name and score are in parseData metadata
+ parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
+ try {
+ scfilters.passScoreAfterParsing(key, content, parse);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ e.printStackTrace(LogUtil.getWarnStream(LOG));
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+
+ }
+
+ try {
+ output.collect
+ (key,
+ new FetcherOutput(datum,
+ storingContent ? content : null,
+ parse != null ? new ParseImpl(parse) : null));
+ } catch (IOException e) {
+ if (LOG.isFatalEnabled()) {
+ e.printStackTrace(LogUtil.getFatalStream(LOG));
+ LOG.fatal("fetcher caught:"+e.toString());
+ }
+ }
+ if (parse != null) return parse.getData().getStatus();
+ else return null;
+ }
+
+ }
+
+ public Fetcher2() { super(null); }
+
+ public Fetcher2(Configuration conf) { super(conf); }
+
+ private void updateStatus(int bytesInPage) throws IOException {
+ pages.incrementAndGet();
+ bytes.addAndGet(bytesInPage);
+ }
+
+
+ private void reportStatus() throws IOException {
+ String status;
+ long elapsed = (System.currentTimeMillis() - start)/1000;
+ status =
+ pages+" pages, "+errors+" errors, "
+ + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
+ + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
+ reporter.setStatus(status);
+ }
+
+ public void configure(JobConf job) {
+ setConf(job);
+
+ this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
+ this.storingContent = isStoringContent(job);
+ this.parsing = isParsing(job);
+
+// if (job.getBoolean("fetcher.verbose", false)) {
+// LOG.setLevel(Level.FINE);
+// }
+ }
+
+ public void close() {}
+
+ public static boolean isParsing(Configuration conf) {
+ return conf.getBoolean("fetcher.parse", true);
+ }
+
+ public static boolean isStoringContent(Configuration conf) {
+ return conf.getBoolean("fetcher.store.content", true);
+ }
+
+ public void run(RecordReader input, OutputCollector output,
+ Reporter reporter) throws IOException {
+
+ this.output = output;
+ this.reporter = reporter;
+ this.fetchQueues = new FetchItemQueues(getConf());
+
+ int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+ if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
+
+ feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
+ //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+ feeder.start();
+
+ // set non-blocking & no-robots mode for HTTP protocol plugins.
+ getConf().setBoolean("http.plugin.check.blocking", false);
+ getConf().setBoolean("http.plugin.check.robots", false);
+
+ for (int i = 0; i < threadCount; i++) { // spawn threads
+ new FetcherThread(getConf()).start();
+ }
+
+ // select a timeout that avoids a task timeout
+ long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+
+ do { // wait for threads to exit
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
+
+ reportStatus();
+ LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+ + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+
+ if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+ fetchQueues.dump();
+ }
+ // some requests seem to hang, despite all intentions
+ if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Aborting with "+activeThreads+" hung threads.");
+ }
+ return;
+ }
+
+ } while (activeThreads.get() > 0);
+ LOG.info("-activeThreads=" + activeThreads);
+
+ }
+
+ public void fetch(Path segment, int threads, boolean parsing)
+ throws IOException {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: starting");
+ LOG.info("Fetcher: segment: " + segment);
+ }
+
+ JobConf job = new NutchJob(getConf());
+ job.setJobName("fetch " + segment);
+
+ job.setInt("fetcher.threads.fetch", threads);
+ job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
+ job.setBoolean("fetcher.parse", parsing);
+
+ // for politeness, don't permit parallel execution of a single task
+ job.setSpeculativeExecution(false);
+
+ job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+ job.setInputFormat(InputFormat.class);
+
+ job.setMapRunnerClass(Fetcher2.class);
+
+ job.setOutputPath(segment);
+ job.setOutputFormat(FetcherOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(FetcherOutput.class);
+
+ JobClient.runJob(job);
+ if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
+ }
+
+
+ /** Run the fetcher. */
+ public static void main(String[] args) throws Exception {
+
+ String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
+
+ if (args.length < 1) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ Path segment = new Path(args[0]);
+
+ Configuration conf = NutchConfiguration.create();
+
+ int threads = conf.getInt("fetcher.threads.fetch", 10);
+ boolean parsing = true;
+
+ for (int i = 1; i < args.length; i++) { // parse command line
+ if (args[i].equals("-threads")) { // found -threads option
+ threads = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-noParsing")) parsing = false;
+ }
+
+ conf.setInt("fetcher.threads.fetch", threads);
+ if (!parsing) {
+ conf.setBoolean("fetcher.parse", parsing);
+ }
+ Fetcher2 fetcher = new Fetcher2(conf); // make a Fetcher
+
+ fetcher.fetch(segment, threads, parsing); // run the Fetcher
+
+ }
+}
Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher2.java
------------------------------------------------------------------------------
svn:eol-style = native