You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2015/01/29 06:39:03 UTC
svn commit: r1655526 [4/26] - in /nutch/trunk: ./
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/metadata/
src/java/org/apache/nutch/net/ src/java/org/apache/nutch/net/pr...
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Thu Jan 29 05:38:59 2015
@@ -55,37 +55,40 @@ import crawlercommons.robots.BaseRobotRu
/**
* A queue-based fetcher.
- *
- * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
- * and many consumers (FetcherThread-s).
- *
- * <p>QueueFeeder reads input fetchlists and
- * populates a set of FetchItemQueue-s, which hold FetchItem-s that
- * describe the items to be fetched. There are as many queues as there are unique
- * hosts, but at any given time the total number of fetch items in all queues
- * is less than a fixed number (currently set to a multiple of the number of
- * threads).
- *
- * <p>As items are consumed from the queues, the QueueFeeder continues to add new
+ *
+ * <p>
+ * This fetcher uses a well-known model of one producer (a QueueFeeder) and many
+ * consumers (FetcherThread-s).
+ *
+ * <p>
+ * QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
+ * which hold FetchItem-s that describe the items to be fetched. There are as
+ * many queues as there are unique hosts, but at any given time the total number
+ * of fetch items in all queues is less than a fixed number (currently set to a
+ * multiple of the number of threads).
+ *
+ * <p>
+ * As items are consumed from the queues, the QueueFeeder continues to add new
* input items, so that their total count stays fixed (FetcherThread-s may also
* add new items to the queues e.g. as a results of redirection) - until all
* input items are exhausted, at which point the number of items in the queues
* begins to decrease. When this number reaches 0 fetcher will finish.
- *
- * <p>This fetcher implementation handles per-host blocking itself, instead
- * of delegating this work to protocol-specific plugins.
- * Each per-host queue handles its own "politeness" settings, such as the
- * maximum number of concurrent requests and crawl delay between consecutive
- * requests - and also a list of requests in progress, and the time the last
- * request was finished. As FetcherThread-s ask for new items to be fetched,
- * queues may return eligible items or null if for "politeness" reasons this
- * host's queue is not yet ready.
- *
- * <p>If there are still unfetched items in the queues, but none of the items
- * are ready, FetcherThread-s will spin-wait until either some items become
+ *
+ * <p>
+ * This fetcher implementation handles per-host blocking itself, instead of
+ * delegating this work to protocol-specific plugins. Each per-host queue
+ * handles its own "politeness" settings, such as the maximum number of
+ * concurrent requests and crawl delay between consecutive requests - and also a
+ * list of requests in progress, and the time the last request was finished. As
+ * FetcherThread-s ask for new items to be fetched, queues may return eligible
+ * items or null if for "politeness" reasons this host's queue is not yet ready.
+ *
+ * <p>
+ * If there are still unfetched items in the queues, but none of the items are
+ * ready, FetcherThread-s will spin-wait until either some items become
* available, or a timeout is reached (at which point the Fetcher will abort,
* assuming the task is hung).
- *
+ *
* @author Andrzej Bialecki
*/
public class Fetcher extends Configured implements Tool,
@@ -99,16 +102,16 @@ public class Fetcher extends Configured
public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
- public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> {
+ public static class InputFormat extends
+ SequenceFileInputFormat<Text, CrawlDatum> {
/** Don't split inputs, to keep things polite. */
- public InputSplit[] getSplits(JobConf job, int nSplits)
- throws IOException {
+ public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
FileStatus[] files = listStatus(job);
FileSplit[] splits = new FileSplit[files.length];
for (int i = 0; i < files.length; i++) {
FileStatus cur = files[i];
- splits[i] = new FileSplit(cur.getPath(), 0,
- cur.getLen(), (String[])null);
+ splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+ (String[]) null);
}
return splits;
}
@@ -124,8 +127,8 @@ public class Fetcher extends Configured
private long start = System.currentTimeMillis(); // start time of fetcher run
private AtomicLong lastRequestStart = new AtomicLong(start);
- private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
- private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
+ private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
+ private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
private AtomicInteger errors = new AtomicInteger(0); // total pages errored
private boolean storingContent;
@@ -149,7 +152,8 @@ public class Fetcher extends Configured
this(url, u, datum, queueID, 0);
}
- public FetchItem(Text url, URL u, CrawlDatum datum, String queueID, int outlinkDepth) {
+ public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
+ int outlinkDepth) {
this.url = url;
this.u = u;
this.datum = datum;
@@ -157,15 +161,17 @@ public class Fetcher extends Configured
this.outlinkDepth = outlinkDepth;
}
- /** Create an item. Queue id will be created based on <code>queueMode</code>
- * argument, either as a protocol + hostname pair, protocol + IP
- * address pair or protocol+domain pair.
+ /**
+ * Create an item. Queue id will be created based on <code>queueMode</code>
+ * argument, either as a protocol + hostname pair, protocol + IP address
+ * pair or protocol+domain pair.
*/
- public static FetchItem create(Text url, CrawlDatum datum, String queueMode) {
+ public static FetchItem create(Text url, CrawlDatum datum, String queueMode) {
return create(url, datum, queueMode, 0);
}
- public static FetchItem create(Text url, CrawlDatum datum, String queueMode, int outlinkDepth) {
+ public static FetchItem create(Text url, CrawlDatum datum,
+ String queueMode, int outlinkDepth) {
String queueID;
URL u = null;
try {
@@ -185,19 +191,18 @@ public class Fetcher extends Configured
LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
return null;
}
- }
- else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){
+ } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
key = URLUtil.getDomainName(u);
if (key == null) {
- LOG.warn("Unknown domain for url: " + url + ", using URL string as key");
- key=u.toExternalForm();
+ LOG.warn("Unknown domain for url: " + url
+ + ", using URL string as key");
+ key = u.toExternalForm();
}
- }
- else {
+ } else {
key = u.getHost();
if (key == null) {
LOG.warn("Unknown host for url: " + url + ", using URL string as key");
- key=u.toExternalForm();
+ key = u.toExternalForm();
}
}
queueID = proto + "://" + key.toLowerCase();
@@ -222,13 +227,14 @@ public class Fetcher extends Configured
}
/**
- * This class handles FetchItems which come from the same host ID (be it
- * a proto/hostname or proto/IP pair). It also keeps track of requests in
+ * This class handles FetchItems which come from the same host ID (be it a
+ * proto/hostname or proto/IP pair). It also keeps track of requests in
* progress and elapsed time between requests.
*/
private static class FetchItemQueue {
- List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
- AtomicInteger inProgress = new AtomicInteger();
+ List<FetchItem> queue = Collections
+ .synchronizedList(new LinkedList<FetchItem>());
+ AtomicInteger inProgress = new AtomicInteger();
AtomicLong nextFetchTime = new AtomicLong();
AtomicInteger exceptionCounter = new AtomicInteger();
long crawlDelay;
@@ -236,7 +242,8 @@ public class Fetcher extends Configured
int maxThreads;
Configuration conf;
- public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
+ public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
+ long minCrawlDelay) {
this.conf = conf;
this.maxThreads = maxThreads;
this.crawlDelay = crawlDelay;
@@ -271,26 +278,33 @@ public class Fetcher extends Configured
}
public void addFetchItem(FetchItem it) {
- if (it == null) return;
+ if (it == null)
+ return;
queue.add(it);
}
public void addInProgressFetchItem(FetchItem it) {
- if (it == null) return;
+ if (it == null)
+ return;
inProgress.incrementAndGet();
}
public FetchItem getFetchItem() {
- if (inProgress.get() >= maxThreads) return null;
+ if (inProgress.get() >= maxThreads)
+ return null;
long now = System.currentTimeMillis();
- if (nextFetchTime.get() > now) return null;
+ if (nextFetchTime.get() > now)
+ return null;
FetchItem it = null;
- if (queue.size() == 0) return null;
+ if (queue.size() == 0)
+ return null;
try {
it = queue.remove(0);
inProgress.incrementAndGet();
} catch (Exception e) {
- LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
+ LOG.error(
+ "Cannot remove FetchItem from queue or cannot add it to inProgress queue",
+ e);
}
return it;
}
@@ -314,7 +328,8 @@ public class Fetcher extends Configured
private void setEndTime(long endTime, boolean asap) {
if (!asap)
- nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+ nextFetchTime.set(endTime
+ + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
else
nextFetchTime.set(endTime);
}
@@ -346,17 +361,21 @@ public class Fetcher extends Configured
this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
// check that the mode is known
- if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN)
+ if (!queueMode.equals(QUEUE_MODE_IP)
+ && !queueMode.equals(QUEUE_MODE_DOMAIN)
&& !queueMode.equals(QUEUE_MODE_HOST)) {
- LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
+ LOG.error("Unknown partition mode : " + queueMode
+ + " - forcing to byHost");
queueMode = QUEUE_MODE_HOST;
}
- LOG.info("Using queue mode : "+queueMode);
+ LOG.info("Using queue mode : " + queueMode);
this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
- this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+ this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
+ 0.0f) * 1000);
this.timelimit = conf.getLong("fetcher.timelimit", -1);
- this.maxExceptionsPerQueue = conf.getInt("fetcher.max.exceptions.per.queue", -1);
+ this.maxExceptionsPerQueue = conf.getInt(
+ "fetcher.max.exceptions.per.queue", -1);
}
public int getTotalSize() {
@@ -369,7 +388,8 @@ public class Fetcher extends Configured
public void addFetchItem(Text url, CrawlDatum datum) {
FetchItem it = FetchItem.create(url, datum, queueMode);
- if (it != null) addFetchItem(it);
+ if (it != null)
+ addFetchItem(it);
}
public synchronized void addFetchItem(FetchItem it) {
@@ -402,8 +422,8 @@ public class Fetcher extends Configured
}
public synchronized FetchItem getFetchItem() {
- Iterator<Map.Entry<String, FetchItemQueue>> it =
- queues.entrySet().iterator();
+ Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
+ .iterator();
while (it.hasNext()) {
FetchItemQueue fiq = it.next().getValue();
// reap empty queues
@@ -431,7 +451,8 @@ public class Fetcher extends Configured
// there might also be a case where totalsize !=0 but number of queues
// == 0
// in which case we simply force it to 0 to avoid blocking
- if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+ if (totalSize.get() != 0 && queues.size() == 0)
+ totalSize.set(0);
}
return count;
}
@@ -442,7 +463,8 @@ public class Fetcher extends Configured
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
- if (fiq.getQueueSize() == 0) continue;
+ if (fiq.getQueueSize() == 0)
+ continue;
LOG.info("* queue: " + id + " >> dropping! ");
int deleted = fiq.emptyQueue();
for (int i = 0; i < deleted; i++) {
@@ -457,7 +479,7 @@ public class Fetcher extends Configured
/**
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
- *
+ *
* @param queueid
* @return number of purged items
*/
@@ -470,7 +492,7 @@ public class Fetcher extends Configured
return 0;
}
int excCount = fiq.incrementExceptionCounter();
- if (maxExceptionsPerQueue!= -1 && excCount >= maxExceptionsPerQueue) {
+ if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
// too many exceptions for items in this queue - purge it
int deleted = fiq.emptyQueue();
LOG.info("* queue: " + queueid + " >> removed " + deleted
@@ -483,11 +505,11 @@ public class Fetcher extends Configured
return 0;
}
-
public synchronized void dump() {
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
- if (fiq.getQueueSize() == 0) continue;
+ if (fiq.getQueueSize() == 0)
+ continue;
LOG.info("* queue: " + id);
fiq.dump();
}
@@ -495,8 +517,8 @@ public class Fetcher extends Configured
}
/**
- * This class feeds the queues with input items, and re-fills them as
- * items are consumed by FetcherThread-s.
+ * This class feeds the queues with input items, and re-fills them as items
+ * are consumed by FetcherThread-s.
*/
private static class QueueFeeder extends Thread {
private RecordReader<Text, CrawlDatum> reader;
@@ -541,7 +563,9 @@ public class Fetcher extends Configured
// queues are full - spin-wait until they have some free space
try {
Thread.sleep(1000);
- } catch (Exception e) {};
+ } catch (Exception e) {
+ }
+ ;
continue;
} else {
LOG.debug("-feeding " + feed + " input urls ...");
@@ -562,8 +586,8 @@ public class Fetcher extends Configured
}
}
}
- LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"
- + timelimitcount);
+ LOG.info("QueueFeeder finished: total " + cnt
+ + " records + hit by time limit :" + timelimitcount);
}
}
@@ -595,12 +619,12 @@ public class Fetcher extends Configured
private int outlinksDepthDivisor;
private boolean skipTruncated;
-
+
private boolean halted = false;
public FetcherThread(Configuration conf) {
- this.setDaemon(true); // don't hang JVM on exit
- this.setName("FetcherThread"); // use an informative name
+ this.setDaemon(true); // don't hang JVM on exit
+ this.setName("FetcherThread"); // use an informative name
this.conf = conf;
this.urlFilters = new URLFilters(conf);
this.scfilters = new ScoringFilters(conf);
@@ -609,26 +633,33 @@ public class Fetcher extends Configured
this.protocolFactory = new ProtocolFactory(conf);
this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
- queueMode = conf.get("fetcher.queue.mode", FetchItemQueues.QUEUE_MODE_HOST);
+ queueMode = conf.get("fetcher.queue.mode",
+ FetchItemQueues.QUEUE_MODE_HOST);
// check that the mode is known
- if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP) && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
+ if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
+ && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
&& !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
- LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
+ LOG.error("Unknown partition mode : " + queueMode
+ + " - forcing to byHost");
queueMode = FetchItemQueues.QUEUE_MODE_HOST;
}
- LOG.info("Using queue mode : "+queueMode);
+ LOG.info("Using queue mode : " + queueMode);
this.maxRedirect = conf.getInt("http.redirect.max", 3);
- this.ignoreExternalLinks =
- conf.getBoolean("db.ignore.external.links", false);
+ this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links",
+ false);
maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
- maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE : maxOutlinksPerPage;
+ maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
+ : maxOutlinksPerPage;
interval = conf.getInt("db.fetch.interval.default", 2592000);
ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
- outlinksIgnoreExternal = conf.getBoolean("fetcher.follow.outlinks.ignore.external", false);
- maxOutlinkDepthNumLinks = conf.getInt("fetcher.follow.outlinks.num.links", 4);
- outlinksDepthDivisor = conf.getInt("fetcher.follow.outlinks.depth.divisor", 2);
+ outlinksIgnoreExternal = conf.getBoolean(
+ "fetcher.follow.outlinks.ignore.external", false);
+ maxOutlinkDepthNumLinks = conf.getInt(
+ "fetcher.follow.outlinks.num.links", 4);
+ outlinksDepthDivisor = conf.getInt(
+ "fetcher.follow.outlinks.depth.divisor", 2);
}
@SuppressWarnings("fallthrough")
@@ -645,7 +676,7 @@ public class Fetcher extends Configured
fit = null;
return;
}
-
+
fit = fetchQueues.getFetchItem();
if (fit == null) {
if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
@@ -654,8 +685,9 @@ public class Fetcher extends Configured
spinWaiting.incrementAndGet();
try {
Thread.sleep(500);
- } catch (Exception e) {}
- spinWaiting.decrementAndGet();
+ } catch (Exception e) {
+ }
+ spinWaiting.decrementAndGet();
continue;
} else {
// all done, finish this thread
@@ -664,8 +696,8 @@ public class Fetcher extends Configured
}
}
lastRequestStart.set(System.currentTimeMillis());
- Text reprUrlWritable =
- (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+ Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
+ Nutch.WRITABLE_REPR_URL_KEY);
if (reprUrlWritable == null) {
reprUrl = fit.url.toString();
} else {
@@ -677,14 +709,16 @@ public class Fetcher extends Configured
redirectCount = 0;
do {
if (LOG.isInfoEnabled()) {
- LOG.info("fetching " + fit.url + " (queue crawl delay=" +
- fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)");
+ LOG.info("fetching " + fit.url + " (queue crawl delay="
+ + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay
+ + "ms)");
}
if (LOG.isDebugEnabled()) {
LOG.debug("redirectCount=" + redirectCount);
}
redirecting = false;
- Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
+ Protocol protocol = this.protocolFactory.getProtocol(fit.url
+ .toString());
BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
if (!rules.isAllowed(fit.u.toString())) {
// unblock
@@ -692,7 +726,9 @@ public class Fetcher extends Configured
if (LOG.isDebugEnabled()) {
LOG.debug("Denied by robots.txt: " + fit.url);
}
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatum.STATUS_FETCH_GONE);
reporter.incrCounter("FetcherStatus", "robots_denied", 1);
continue;
}
@@ -700,19 +736,27 @@ public class Fetcher extends Configured
if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
// unblock
fetchQueues.finishFetchItem(fit, true);
- LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
- reporter.incrCounter("FetcherStatus", "robots_denied_maxcrawldelay", 1);
+ LOG.debug("Crawl-Delay for " + fit.url + " too long ("
+ + rules.getCrawlDelay() + "), skipping");
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ reporter.incrCounter("FetcherStatus",
+ "robots_denied_maxcrawldelay", 1);
continue;
} else {
- FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+ FetchItemQueue fiq = fetchQueues
+ .getFetchItemQueue(fit.queueID);
fiq.crawlDelay = rules.getCrawlDelay();
if (LOG.isDebugEnabled()) {
- LOG.info("Crawl delay for queue: " + fit.queueID + " is set to " + fiq.crawlDelay + " as per robots.txt. url: " + fit.url);
+ LOG.info("Crawl delay for queue: " + fit.queueID
+ + " is set to " + fiq.crawlDelay
+ + " as per robots.txt. url: " + fit.url);
}
}
}
- ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
+ ProtocolOutput output = protocol.getProtocolOutput(fit.url,
+ fit.datum);
ProtocolStatus status = output.getStatus();
Content content = output.getContent();
ParseStatus pstatus = null;
@@ -723,32 +767,31 @@ public class Fetcher extends Configured
reporter.incrCounter("FetcherStatus", status.getName(), 1);
- switch(status.getCode()) {
+ switch (status.getCode()) {
case ProtocolStatus.WOULDBLOCK:
// retry ?
fetchQueues.addFetchItem(fit);
break;
- case ProtocolStatus.SUCCESS: // got a page
- pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
+ case ProtocolStatus.SUCCESS: // got a page
+ pstatus = output(fit.url, fit.datum, content, status,
+ CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
updateStatus(content.getContent().length);
- if (pstatus != null && pstatus.isSuccess() &&
- pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+ if (pstatus != null && pstatus.isSuccess()
+ && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
String newUrl = pstatus.getMessage();
int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
- Text redirUrl =
- handleRedirect(fit.url, fit.datum,
- urlString, newUrl,
- refreshTime < Fetcher.PERM_REFRESH_TIME,
- Fetcher.CONTENT_REDIR);
+ Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+ newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
+ Fetcher.CONTENT_REDIR);
if (redirUrl != null) {
queueRedirect(redirUrl, fit);
}
}
break;
- case ProtocolStatus.MOVED: // redirect
+ case ProtocolStatus.MOVED: // redirect
case ProtocolStatus.TEMP_MOVED:
int code;
boolean temp;
@@ -761,10 +804,8 @@ public class Fetcher extends Configured
}
output(fit.url, fit.datum, content, status, code);
String newUrl = status.getMessage();
- Text redirUrl =
- handleRedirect(fit.url, fit.datum,
- urlString, newUrl, temp,
- Fetcher.PROTOCOL_REDIR);
+ Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+ newUrl, temp, Fetcher.PROTOCOL_REDIR);
if (redirUrl != null) {
queueRedirect(redirUrl, fit);
} else {
@@ -775,31 +816,37 @@ public class Fetcher extends Configured
case ProtocolStatus.EXCEPTION:
logError(fit.url, status.getMessage());
- int killedURLs = fetchQueues.checkExceptionThreshold(fit.getQueueID());
- if (killedURLs!=0)
- reporter.incrCounter("FetcherStatus", "AboveExceptionThresholdInQueue", killedURLs);
+ int killedURLs = fetchQueues.checkExceptionThreshold(fit
+ .getQueueID());
+ if (killedURLs != 0)
+ reporter.incrCounter("FetcherStatus",
+ "AboveExceptionThresholdInQueue", killedURLs);
/* FALLTHROUGH */
- case ProtocolStatus.RETRY: // retry
+ case ProtocolStatus.RETRY: // retry
case ProtocolStatus.BLOCKED:
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_RETRY);
break;
- case ProtocolStatus.GONE: // gone
+ case ProtocolStatus.GONE: // gone
case ProtocolStatus.NOTFOUND:
case ProtocolStatus.ACCESS_DENIED:
case ProtocolStatus.ROBOTS_DENIED:
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_GONE);
break;
case ProtocolStatus.NOTMODIFIED:
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_NOTMODIFIED);
break;
default:
if (LOG.isWarnEnabled()) {
LOG.warn("Unknown ProtocolStatus: " + status.getCode());
}
- output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_RETRY);
}
if (redirecting && redirectCount > maxRedirect) {
@@ -807,34 +854,38 @@ public class Fetcher extends Configured
if (LOG.isInfoEnabled()) {
LOG.info(" - redirect count exceeded " + fit.url);
}
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_REDIR_EXCEEDED,
+ CrawlDatum.STATUS_FETCH_GONE);
}
} while (redirecting && (redirectCount <= maxRedirect));
- } catch (Throwable t) { // unexpected exception
+ } catch (Throwable t) { // unexpected exception
// unblock
fetchQueues.finishFetchItem(fit);
logError(fit.url, StringUtils.stringifyException(t));
- output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
+ CrawlDatum.STATUS_FETCH_RETRY);
}
}
} catch (Throwable e) {
if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:"+e.toString());
+ LOG.error("fetcher caught:" + e.toString());
}
} finally {
- if (fit != null) fetchQueues.finishFetchItem(fit);
+ if (fit != null)
+ fetchQueues.finishFetchItem(fit);
activeThreads.decrementAndGet(); // count threads
- LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
+ LOG.info("-finishing thread " + getName() + ", activeThreads="
+ + activeThreads);
}
}
- private Text handleRedirect(Text url, CrawlDatum datum,
- String urlString, String newUrl,
- boolean temp, String redirType)
- throws MalformedURLException, URLFilterException {
+ private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+ String newUrl, boolean temp, String redirType)
+ throws MalformedURLException, URLFilterException {
newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
newUrl = urlFilters.filter(newUrl);
@@ -844,13 +895,14 @@ public class Fetcher extends Configured
String newHost = new URL(newUrl).getHost().toLowerCase();
if (!origHost.equals(newHost)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(" - ignoring redirect " + redirType + " from " +
- urlString + " to " + newUrl +
- " because external links are ignored");
+ LOG.debug(" - ignoring redirect " + redirType + " from "
+ + urlString + " to " + newUrl
+ + " because external links are ignored");
}
return null;
}
- } catch (MalformedURLException e) { }
+ } catch (MalformedURLException e) {
+ }
}
if (newUrl != null && !newUrl.equals(urlString)) {
@@ -860,13 +912,13 @@ public class Fetcher extends Configured
redirecting = true;
redirectCount++;
if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " +
- url + " (fetching now)");
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching now)");
}
return url;
} else {
CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
- datum.getFetchInterval(),datum.getScore());
+ datum.getFetchInterval(), datum.getScore());
// transfer existing metadata
newDatum.getMetaData().putAll(datum.getMetaData());
try {
@@ -880,21 +932,22 @@ public class Fetcher extends Configured
}
output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " +
- url + " (fetching later)");
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching later)");
}
return null;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect skipped: " +
- (newUrl != null ? "to same url" : "filtered"));
+ LOG.debug(" - " + redirType + " redirect skipped: "
+ + (newUrl != null ? "to same url" : "filtered"));
}
return null;
}
}
- private void queueRedirect(Text redirUrl, FetchItem fit) throws ScoringFilterException {
+ private void queueRedirect(Text redirUrl, FetchItem fit)
+ throws ScoringFilterException {
CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
fit.datum.getFetchInterval(), fit.datum.getScore());
// transfer all existing metadata to the redirect
@@ -906,13 +959,13 @@ public class Fetcher extends Configured
}
fit = FetchItem.create(redirUrl, newDatum, queueMode);
if (fit != null) {
- FetchItemQueue fiq =
- fetchQueues.getFetchItemQueue(fit.queueID);
+ FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
fiq.addInProgressFetchItem(fit);
} else {
// stop redirecting
redirecting = false;
- reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1);
+ reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
+ 1);
}
}
@@ -923,26 +976,29 @@ public class Fetcher extends Configured
errors.incrementAndGet();
}
- private ParseStatus output(Text key, CrawlDatum datum,
- Content content, ProtocolStatus pstatus, int status) {
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status) {
return output(key, datum, content, pstatus, status, 0);
}
- private ParseStatus output(Text key, CrawlDatum datum,
- Content content, ProtocolStatus pstatus, int status, int outlinkDepth) {
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status, int outlinkDepth) {
datum.setStatus(status);
datum.setFetchTime(System.currentTimeMillis());
- if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
-
+ if (pstatus != null)
+ datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
ParseResult parseResult = null;
if (content != null) {
Metadata metadata = content.getMetadata();
-
+
// store the guessed content type in the crawldatum
- if (content.getContentType() != null) datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(content.getContentType()));
-
+ if (content.getContentType() != null)
+ datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+ new Text(content.getContentType()));
+
// add segment to metadata
metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
// add score to content metadata so that ParseSegment can pick it up.
@@ -953,29 +1009,34 @@ public class Fetcher extends Configured
LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
}
}
- /* Note: Fetcher will only follow meta-redirects coming from the
- * original URL. */
+ /*
+ * Note: Fetcher will only follow meta-redirects coming from the
+ * original URL.
+ */
if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
- if (!skipTruncated || (skipTruncated && !ParseSegment.isTruncated(content))) {
+ if (!skipTruncated
+ || (skipTruncated && !ParseSegment.isTruncated(content))) {
try {
parseResult = this.parseUtil.parse(content);
} catch (Exception e) {
- LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
+ LOG.warn("Error parsing: " + key + ": "
+ + StringUtils.stringifyException(e));
}
}
-
+
if (parseResult == null) {
- byte[] signature =
- SignatureFactory.getSignature(getConf()).calculate(content,
- new ParseStatus().getEmptyParse(conf));
+ byte[] signature = SignatureFactory.getSignature(getConf())
+ .calculate(content, new ParseStatus().getEmptyParse(conf));
datum.setSignature(signature);
}
}
- /* Store status code in content So we can read this value during
- * parsing (as a separate job) and decide to parse or not.
+ /*
+ * Store status code in content So we can read this value during parsing
+ * (as a separate job) and decide to parse or not.
*/
- content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
+ content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+ Integer.toString(status));
}
try {
@@ -996,11 +1057,10 @@ public class Fetcher extends Configured
// Calculate page signature. For non-parsing fetchers this will
// be done in ParseSegment
- byte[] signature =
- SignatureFactory.getSignature(getConf()).calculate(content, parse);
+ byte[] signature = SignatureFactory.getSignature(getConf())
+ .calculate(content, parse);
// Ensure segment name and score are in parseData metadata
- parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
- segmentName);
+ parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
StringUtil.toHexString(signature));
// Pass fetch time to content meta
@@ -1039,7 +1099,8 @@ public class Fetcher extends Configured
for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {
String toUrl = links[i].getToUrl();
- toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, fromHost, ignoreExternalLinks, urlFilters, normalizers);
+ toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
+ fromHost, ignoreExternalLinks, urlFilters, normalizers);
if (toUrl == null) {
continue;
}
@@ -1052,49 +1113,57 @@ public class Fetcher extends Configured
// Only process depth N outlinks
if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
- reporter.incrCounter("FetcherOutlinks", "outlinks_detected", outlinks.size());
+ reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
+ outlinks.size());
// Counter to limit num outlinks to follow per page
int outlinkCounter = 0;
- // Calculate variable number of outlinks by depth using the divisor (outlinks = Math.floor(divisor / depth * num.links))
- int maxOutlinksByDepth = (int)Math.floor(outlinksDepthDivisor / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
+ // Calculate variable number of outlinks by depth using the
+ // divisor (outlinks = Math.floor(divisor / depth * num.links))
+ int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
+ / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
String followUrl;
// Walk over the outlinks and add as new FetchItem to the queues
Iterator<String> iter = outlinks.iterator();
- while(iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
+ while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
followUrl = iter.next();
// Check whether we'll follow external outlinks
if (outlinksIgnoreExternal) {
- if (!URLUtil.getHost(url.toString()).equals(URLUtil.getHost(followUrl))) {
+ if (!URLUtil.getHost(url.toString()).equals(
+ URLUtil.getHost(followUrl))) {
continue;
}
}
- reporter.incrCounter("FetcherOutlinks", "outlinks_following", 1);
+ reporter
+ .incrCounter("FetcherOutlinks", "outlinks_following", 1);
// Create new FetchItem with depth incremented
- FetchItem fit = FetchItem.create(new Text(followUrl), new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), queueMode, outlinkDepth + 1);
+ FetchItem fit = FetchItem.create(new Text(followUrl),
+ new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
+ queueMode, outlinkDepth + 1);
fetchQueues.addFetchItem(fit);
outlinkCounter++;
}
}
- // Overwrite the outlinks in ParseData with the normalized and filtered set
- parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList.size()]));
+ // Overwrite the outlinks in ParseData with the normalized and
+ // filtered set
+ parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
+ .size()]));
- output.collect(url, new NutchWritable(
- new ParseImpl(new ParseText(parse.getText()),
- parseData, parse.isCanonical())));
+ output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+ parse.getText()), parseData, parse.isCanonical())));
}
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:"+e.toString());
+ LOG.error("fetcher caught:" + e.toString());
}
}
@@ -1102,7 +1171,8 @@ public class Fetcher extends Configured
if (parseResult != null && !parseResult.isEmpty()) {
Parse p = parseResult.get(content.getUrl());
if (p != null) {
- reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p.getData().getStatus().getMajorCode()], 1);
+ reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
+ .getData().getStatus().getMajorCode()], 1);
return p.getData().getStatus();
}
}
@@ -1116,33 +1186,39 @@ public class Fetcher extends Configured
public synchronized boolean isHalted() {
return halted;
}
-
+
}
- public Fetcher() { super(null); }
+ public Fetcher() {
+ super(null);
+ }
- public Fetcher(Configuration conf) { super(conf); }
+ public Fetcher(Configuration conf) {
+ super(conf);
+ }
private void updateStatus(int bytesInPage) throws IOException {
pages.incrementAndGet();
bytes.addAndGet(bytesInPage);
}
-
- private void reportStatus(int pagesLastSec, int bytesLastSec) throws IOException {
+ private void reportStatus(int pagesLastSec, int bytesLastSec)
+ throws IOException {
StringBuilder status = new StringBuilder();
- Long elapsed = new Long((System.currentTimeMillis() - start)/1000);
+ Long elapsed = new Long((System.currentTimeMillis() - start) / 1000);
- float avgPagesSec = (float) pages.get() / elapsed.floatValue();
- long avgBytesSec = (bytes.get() /125l) / elapsed.longValue();
+ float avgPagesSec = (float) pages.get() / elapsed.floatValue();
+ long avgBytesSec = (bytes.get() / 125l) / elapsed.longValue();
- status.append(activeThreads).append(" threads (").append(spinWaiting.get()).append(" waiting), ");
+ status.append(activeThreads).append(" threads (").append(spinWaiting.get())
+ .append(" waiting), ");
status.append(fetchQueues.getQueueCount()).append(" queues, ");
status.append(fetchQueues.getTotalSize()).append(" URLs queued, ");
status.append(pages).append(" pages, ").append(errors).append(" errors, ");
status.append(String.format("%.2f", avgPagesSec)).append(" pages/s (");
status.append(pagesLastSec).append(" last sec), ");
- status.append(avgBytesSec).append(" kbits/s (").append((bytesLastSec / 125)).append(" last sec)");
+ status.append(avgBytesSec).append(" kbits/s (")
+ .append((bytesLastSec / 125)).append(" last sec)");
reporter.setStatus(status.toString());
}
@@ -1154,12 +1230,13 @@ public class Fetcher extends Configured
this.storingContent = isStoringContent(job);
this.parsing = isParsing(job);
-// if (job.getBoolean("fetcher.verbose", false)) {
-// LOG.setLevel(Level.FINE);
-// }
+ // if (job.getBoolean("fetcher.verbose", false)) {
+ // LOG.setLevel(Level.FINE);
+ // }
}
- public void close() {}
+ public void close() {
+ }
public static boolean isParsing(Configuration conf) {
return conf.getBoolean("fetcher.parse", true);
@@ -1170,43 +1247,53 @@ public class Fetcher extends Configured
}
public void run(RecordReader<Text, CrawlDatum> input,
- OutputCollector<Text, NutchWritable> output,
- Reporter reporter) throws IOException {
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
this.output = output;
this.reporter = reporter;
this.fetchQueues = new FetchItemQueues(getConf());
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
- if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: threads: " + threadCount);
+ }
int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2);
- if (LOG.isInfoEnabled()) { LOG.info("Fetcher: time-out divisor: " + timeoutDivisor); }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: time-out divisor: " + timeoutDivisor);
+ }
- int queueDepthMuliplier = getConf().getInt("fetcher.queue.depth.multiplier", 50);
+ int queueDepthMuliplier = getConf().getInt(
+ "fetcher.queue.depth.multiplier", 50);
- feeder = new QueueFeeder(input, fetchQueues, threadCount * queueDepthMuliplier);
- //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+ feeder = new QueueFeeder(input, fetchQueues, threadCount
+ * queueDepthMuliplier);
+ // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
- // the value of the time limit is either -1 or the time where it should finish
+ // the value of the time limit is either -1 or the time where it should
+ // finish
long timelimit = getConf().getLong("fetcher.timelimit", -1);
- if (timelimit != -1) feeder.setTimeLimit(timelimit);
+ if (timelimit != -1)
+ feeder.setTimeLimit(timelimit);
feeder.start();
// set non-blocking & no-robots mode for HTTP protocol plugins.
getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
- for (int i = 0; i < threadCount; i++) { // spawn threads
+ for (int i = 0; i < threadCount; i++) { // spawn threads
FetcherThread t = new FetcherThread(getConf());
fetcherThreads.add(t);
t.start();
}
// select a timeout that avoids a task timeout
- long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/timeoutDivisor;
+ long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000)
+ / timeoutDivisor;
- // Used for threshold check, holds pages and bytes processed in the last second
+ // Used for threshold check, holds pages and bytes processed in the last
+ // second
int pagesLastSec;
int bytesLastSec;
@@ -1214,57 +1301,74 @@ public class Fetcher extends Configured
boolean throughputThresholdExceeded = false;
int throughputThresholdNumRetries = 0;
- int throughputThresholdPages = getConf().getInt("fetcher.throughput.threshold.pages", -1);
- if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
- int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5);
- if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); }
- long throughputThresholdTimeLimit = getConf().getLong("fetcher.throughput.threshold.check.after", -1);
-
+ int throughputThresholdPages = getConf().getInt(
+ "fetcher.throughput.threshold.pages", -1);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages);
+ }
+ int throughputThresholdMaxRetries = getConf().getInt(
+ "fetcher.throughput.threshold.retries", 5);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: throughput threshold retries: "
+ + throughputThresholdMaxRetries);
+ }
+ long throughputThresholdTimeLimit = getConf().getLong(
+ "fetcher.throughput.threshold.check.after", -1);
+
int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000;
int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount);
- if (maxNumThreads < threadCount){
- LOG.info("fetcher.maxNum.threads can't be < than "+ threadCount + " : using "+threadCount+" instead");
+ if (maxNumThreads < threadCount) {
+ LOG.info("fetcher.maxNum.threads can't be < than " + threadCount
+ + " : using " + threadCount + " instead");
maxNumThreads = threadCount;
}
- int bandwidthTargetCheckEveryNSecs = getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
- if (bandwidthTargetCheckEveryNSecs < 1){
+ int bandwidthTargetCheckEveryNSecs = getConf().getInt(
+ "fetcher.bandwidth.target.check.everyNSecs", 30);
+ if (bandwidthTargetCheckEveryNSecs < 1) {
LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
bandwidthTargetCheckEveryNSecs = 1;
}
-
+
int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1);
-
+
int bandwidthTargetCheckCounter = 0;
long bytesAtLastBWTCheck = 0l;
-
- do { // wait for threads to exit
+
+ do { // wait for threads to exit
pagesLastSec = pages.get();
- bytesLastSec = (int)bytes.get();
+ bytesLastSec = (int) bytes.get();
try {
Thread.sleep(1000);
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ }
pagesLastSec = pages.get() - pagesLastSec;
- bytesLastSec = (int)bytes.get() - bytesLastSec;
+ bytesLastSec = (int) bytes.get() - bytesLastSec;
reporter.incrCounter("FetcherStatus", "bytes_downloaded", bytesLastSec);
reportStatus(pagesLastSec, bytesLastSec);
- LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
- + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ", fetchQueues.getQueueCount="+fetchQueues.getQueueCount());
+ LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
+ + spinWaiting.get() + ", fetchQueues.totalSize="
+ + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
+ + fetchQueues.getQueueCount());
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
// if throughput threshold is enabled
- if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) {
+ if (throughputThresholdTimeLimit < System.currentTimeMillis()
+ && throughputThresholdPages != -1) {
// Check if we're dropping below the threshold
if (pagesLastSec < throughputThresholdPages) {
throughputThresholdNumRetries++;
- LOG.warn(Integer.toString(throughputThresholdNumRetries) + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + " pages per second");
+ LOG.warn(Integer.toString(throughputThresholdNumRetries)
+ + ": dropping below configured threshold of "
+ + Integer.toString(throughputThresholdPages)
+ + " pages per second");
// Quit if we dropped below threshold too many times
if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
@@ -1273,42 +1377,55 @@ public class Fetcher extends Configured
// Disable the threshold checker
throughputThresholdPages = -1;
- // Empty the queues cleanly and get number of items that were dropped
+ // Empty the queues cleanly and get number of items that were
+ // dropped
int hitByThrougputThreshold = fetchQueues.emptyQueues();
- if (hitByThrougputThreshold != 0) reporter.incrCounter("FetcherStatus",
- "hitByThrougputThreshold", hitByThrougputThreshold);
+ if (hitByThrougputThreshold != 0)
+ reporter.incrCounter("FetcherStatus", "hitByThrougputThreshold",
+ hitByThrougputThreshold);
}
}
}
-
+
// adjust the number of threads if a target bandwidth has been set
- if (targetBandwidth>0) {
- if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) bandwidthTargetCheckCounter++;
- else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs){
- long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)/bandwidthTargetCheckEveryNSecs;
+ if (targetBandwidth > 0) {
+ if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
+ bandwidthTargetCheckCounter++;
+ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
+ long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
+ / bandwidthTargetCheckEveryNSecs;
bytesAtLastBWTCheck = bytes.get();
bandwidthTargetCheckCounter = 0;
int averageBdwPerThread = 0;
- if (activeThreads.get()>0)
- averageBdwPerThread = Math.round(bpsSinceLastCheck/activeThreads.get());
+ if (activeThreads.get() > 0)
+ averageBdwPerThread = Math.round(bpsSinceLastCheck
+ / activeThreads.get());
- LOG.info("averageBdwPerThread : "+(averageBdwPerThread/1000) + " kbps");
+ LOG.info("averageBdwPerThread : " + (averageBdwPerThread / 1000)
+ + " kbps");
- if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){
+ if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) {
// check whether it is worth doing e.g. more queues than threads
- if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads.get()){
-
+ if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads
+ .get()) {
+
long remainingBdw = targetBandwidth - bpsSinceLastCheck;
- int additionalThreads = Math.round(remainingBdw/averageBdwPerThread);
+ int additionalThreads = Math.round(remainingBdw
+ / averageBdwPerThread);
int availableThreads = maxNumThreads - activeThreads.get();
- // determine the number of available threads (min between availableThreads and additionalThreads)
- additionalThreads = (availableThreads < additionalThreads ? availableThreads:additionalThreads);
- LOG.info("Has space for more threads ("+(bpsSinceLastCheck/1000) +" vs "+(targetBandwidth/1000)+" kbps) \t=> adding "+additionalThreads+" new threads");
+ // determine the number of available threads (min between
+ // availableThreads and additionalThreads)
+ additionalThreads = (availableThreads < additionalThreads ? availableThreads
+ : additionalThreads);
+ LOG.info("Has space for more threads ("
+ + (bpsSinceLastCheck / 1000) + " vs "
+ + (targetBandwidth / 1000) + " kbps) \t=> adding "
+ + additionalThreads + " new threads");
// activate new threads
for (int i = 0; i < additionalThreads; i++) {
FetcherThread thread = new FetcherThread(getConf());
@@ -1316,14 +1433,18 @@ public class Fetcher extends Configured
thread.start();
}
}
- }
- else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread > 0){
- // if the bandwidth we're using is greater then the expected bandwidth, we have to stop some threads
+ } else if (bpsSinceLastCheck > targetBandwidth
+ && averageBdwPerThread > 0) {
+ // if the bandwidth we're using is greater then the expected
+ // bandwidth, we have to stop some threads
long excessBdw = bpsSinceLastCheck - targetBandwidth;
- int excessThreads = Math.round(excessBdw/averageBdwPerThread);
- LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000 +" vs "+(targetBandwidth/1000)+" kbps). \t=> excessThreads = "+excessThreads);
+ int excessThreads = Math.round(excessBdw / averageBdwPerThread);
+ LOG.info("Exceeding target bandwidth (" + bpsSinceLastCheck / 1000
+ + " vs " + (targetBandwidth / 1000)
+ + " kbps). \t=> excessThreads = " + excessThreads);
// keep at least one
- if (excessThreads >= fetcherThreads.size()) excessThreads = 0;
+ if (excessThreads >= fetcherThreads.size())
+ excessThreads = 0;
// de-activates threads
for (int i = 0; i < excessThreads; i++) {
FetcherThread thread = fetcherThreads.removeLast();
@@ -1336,18 +1457,20 @@ public class Fetcher extends Configured
// check timelimit
if (!feeder.isAlive()) {
int hitByTimeLimit = fetchQueues.checkTimelimit();
- if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",
- "hitByTimeLimit", hitByTimeLimit);
+ if (hitByTimeLimit != 0)
+ reporter.incrCounter("FetcherStatus", "hitByTimeLimit",
+ hitByTimeLimit);
}
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
- LOG.warn("Aborting with "+activeThreads+" hung threads.");
+ LOG.warn("Aborting with " + activeThreads + " hung threads.");
for (int i = 0; i < fetcherThreads.size(); i++) {
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
- LOG.warn("Thread #" + i + " hung while processing " + thread.reprUrl);
+ LOG.warn("Thread #" + i + " hung while processing "
+ + thread.reprUrl);
if (LOG.isDebugEnabled()) {
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
@@ -1368,8 +1491,7 @@ public class Fetcher extends Configured
}
- public void fetch(Path segment, int threads)
- throws IOException {
+ public void fetch(Path segment, int threads) throws IOException {
checkConfiguration();
@@ -1390,24 +1512,31 @@ public class Fetcher extends Configured
getConf().setLong("fetcher.timelimit", timelimit);
}
- // Set the time limit after which the throughput threshold feature is enabled
- timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 10);
+ // Set the time limit after which the throughput threshold feature is
+ // enabled
+ timelimit = getConf().getLong("fetcher.throughput.threshold.check.after",
+ 10);
timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);
int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
if (maxOutlinkDepth > 0) {
- LOG.info("Fetcher: following outlinks up to depth: " + Integer.toString(maxOutlinkDepth));
+ LOG.info("Fetcher: following outlinks up to depth: "
+ + Integer.toString(maxOutlinkDepth));
- int maxOutlinkDepthNumLinks = getConf().getInt("fetcher.follow.outlinks.num.links", 4);
- int outlinksDepthDivisor = getConf().getInt("fetcher.follow.outlinks.depth.divisor", 2);
+ int maxOutlinkDepthNumLinks = getConf().getInt(
+ "fetcher.follow.outlinks.num.links", 4);
+ int outlinksDepthDivisor = getConf().getInt(
+ "fetcher.follow.outlinks.depth.divisor", 2);
int totalOutlinksToFollow = 0;
for (int i = 0; i < maxOutlinkDepth; i++) {
- totalOutlinksToFollow += (int)Math.floor(outlinksDepthDivisor / (i + 1) * maxOutlinkDepthNumLinks);
+ totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor
+ / (i + 1) * maxOutlinkDepthNumLinks);
}
- LOG.info("Fetcher: maximum outlinks to follow: " + Integer.toString(totalOutlinksToFollow));
+ LOG.info("Fetcher: maximum outlinks to follow: "
+ + Integer.toString(totalOutlinksToFollow));
}
JobConf job = new NutchJob(getConf());
@@ -1419,7 +1548,8 @@ public class Fetcher extends Configured
// for politeness, don't permit parallel execution of a single task
job.setSpeculativeExecution(false);
- FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.GENERATE_DIR_NAME));
job.setInputFormat(InputFormat.class);
job.setMapRunnerClass(Fetcher.class);
@@ -1432,10 +1562,10 @@ public class Fetcher extends Configured
JobClient.runJob(job);
long end = System.currentTimeMillis();
- LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
}
-
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
@@ -1456,9 +1586,9 @@ public class Fetcher extends Configured
int threads = getConf().getInt("fetcher.threads.fetch", 10);
boolean parsing = false;
- for (int i = 1; i < args.length; i++) { // parse command line
- if (args[i].equals("-threads")) { // found -threads option
- threads = Integer.parseInt(args[++i]);
+ for (int i = 1; i < args.length; i++) { // parse command line
+ if (args[i].equals("-threads")) { // found -threads option
+ threads = Integer.parseInt(args[++i]);
}
}
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java Thu Jan 29 05:38:59 2015
@@ -48,74 +48,68 @@ public class FetcherOutputFormat impleme
public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
if ((out == null) && (job.getNumReduceTasks() != 0)) {
- throw new InvalidJobConfException(
- "Output directory not set in JobConf.");
+ throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (fs == null) {
- fs = out.getFileSystem(job);
+ fs = out.getFileSystem(job);
}
if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
- throw new IOException("Segment already fetched!");
+ throw new IOException("Segment already fetched!");
}
public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
- final JobConf job,
- final String name,
- final Progressable progress) throws IOException {
+ final JobConf job, final String name, final Progressable progress)
+ throws IOException {
Path out = FileOutputFormat.getOutputPath(job);
- final Path fetch =
- new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
- final Path content =
- new Path(new Path(out, Content.DIR_NAME), name);
-
- final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
-
- final MapFile.Writer fetchOut =
- new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
- compType, progress);
-
+ final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
+ final Path content = new Path(new Path(out, Content.DIR_NAME), name);
+
+ final CompressionType compType = SequenceFileOutputFormat
+ .getOutputCompressionType(job);
+
+ final MapFile.Writer fetchOut = new MapFile.Writer(job, fs,
+ fetch.toString(), Text.class, CrawlDatum.class, compType, progress);
+
return new RecordWriter<Text, NutchWritable>() {
- private MapFile.Writer contentOut;
- private RecordWriter<Text, Parse> parseOut;
+ private MapFile.Writer contentOut;
+ private RecordWriter<Text, Parse> parseOut;
- {
- if (Fetcher.isStoringContent(job)) {
- contentOut = new MapFile.Writer(job, fs, content.toString(),
- Text.class, Content.class,
- compType, progress);
- }
-
- if (Fetcher.isParsing(job)) {
- parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
- }
+ {
+ if (Fetcher.isStoringContent(job)) {
+ contentOut = new MapFile.Writer(job, fs, content.toString(),
+ Text.class, Content.class, compType, progress);
}
- public void write(Text key, NutchWritable value)
- throws IOException {
-
- Writable w = value.get();
-
- if (w instanceof CrawlDatum)
- fetchOut.append(key, w);
- else if (w instanceof Content && contentOut != null)
- contentOut.append(key, w);
- else if (w instanceof Parse && parseOut != null)
- parseOut.write(key, (Parse)w);
+ if (Fetcher.isParsing(job)) {
+ parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name,
+ progress);
}
+ }
+
+ public void write(Text key, NutchWritable value) throws IOException {
- public void close(Reporter reporter) throws IOException {
- fetchOut.close();
- if (contentOut != null) {
- contentOut.close();
- }
- if (parseOut != null) {
- parseOut.close(reporter);
- }
+ Writable w = value.get();
+
+ if (w instanceof CrawlDatum)
+ fetchOut.append(key, w);
+ else if (w instanceof Content && contentOut != null)
+ contentOut.append(key, w);
+ else if (w instanceof Parse && parseOut != null)
+ parseOut.write(key, (Parse) w);
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ fetchOut.close();
+ if (contentOut != null) {
+ contentOut.close();
+ }
+ if (parseOut != null) {
+ parseOut.close(reporter);
}
+ }
- };
+ };
- }
+ }
}
-