You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by fe...@apache.org on 2012/05/07 17:29:07 UTC
svn commit: r1335063 - in /nutch/branches/nutchgora: CHANGES.txt
conf/nutch-default.xml src/java/org/apache/nutch/fetcher/FetcherReducer.java
Author: ferdy
Date: Mon May 7 15:29:06 2012
New Revision: 1335063
URL: http://svn.apache.org/viewvc?rev=1335063&view=rev
Log:
NUTCH-1355 nutchgora Configure minimum throughput for fetcher
Modified:
nutch/branches/nutchgora/CHANGES.txt
nutch/branches/nutchgora/conf/nutch-default.xml
nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java
Modified: nutch/branches/nutchgora/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/CHANGES.txt?rev=1335063&r1=1335062&r2=1335063&view=diff
==============================================================================
--- nutch/branches/nutchgora/CHANGES.txt (original)
+++ nutch/branches/nutchgora/CHANGES.txt Mon May 7 15:29:06 2012
@@ -1,6 +1,8 @@
Nutch Change Log
Release nutchgora - Current Development
+* NUTCH-1355 nutchgora Configure minimum throughput for fetcher
+
* NUTCH-1354 nutchgora support fetcher.queue.depth.multiplier property (ferdy)
* NUTCH-1353 nutchgora DomainStatistics support crawlId, counter bug and reformatting (ferdy)
Modified: nutch/branches/nutchgora/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/conf/nutch-default.xml?rev=1335063&r1=1335062&r2=1335063&view=diff
==============================================================================
--- nutch/branches/nutchgora/conf/nutch-default.xml (original)
+++ nutch/branches/nutchgora/conf/nutch-default.xml Mon May 7 15:29:06 2012
@@ -687,6 +687,30 @@
</property>
<property>
+ <name>fetcher.throughput.threshold.pages</name>
+ <value>-1</value>
+ <description>The threshold of minimum pages per second. If the fetcher downloads less
+ pages per second than the configured threshold, the fetcher stops, preventing slow queue's
+ from stalling the throughput. This threshold must be an integer. This can be useful when
+ fetcher.timelimit.mins is hard to determine. The default value of -1 disables this check.
+ </description>
+</property>
+
+<property>
+ <name>fetcher.throughput.threshold.sequence</name>
+ <value>5</value>
+ <description>The number of times the fetcher.throughput.threshold is allowed to be exceeded,
+ in a row. This setting prevents accidental slow downs from stopping the fetcher.
+ </description>
+</property>
+
+<property>
+ <name>fetcher.throughput.threshold.check.after</name>
+ <value>5</value>
+ <description>The number of minutes after which the throughput check is enabled.</description>
+</property>
+
+<property>
<name>fetcher.queue.depth.multiplier</name>
<value>50</value>
<description>(EXPERT)The fetcher buffers the incoming URLs into queues based on the [host|domain|IP]
Modified: nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=1335063&r1=1335062&r2=1335063&view=diff
==============================================================================
--- nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java (original)
+++ nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java Mon May 7 15:29:06 2012
@@ -380,25 +380,10 @@ extends GoraReducer<IntWritable, FetchEn
}
public synchronized int checkTimelimit() {
- int count = 0;
if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
- // emptying the queues
- for (String id : queues.keySet()) {
- FetchItemQueue fiq = queues.get(id);
- if (fiq.getQueueSize() == 0) continue;
- LOG.info("* queue: " + id + " >> timelimit! ");
- int deleted = fiq.emptyQueue();
- for (int i = 0; i < deleted; i++) {
- totalSize.decrementAndGet();
- }
- count += deleted;
- }
- // there might also be a case where totalsize !=0 but number of queues
- // == 0
- // in which case we simply force it to 0 to avoid blocking
- if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+ return emptyQueues();
}
- return count;
+ return 0;
}
@@ -410,6 +395,29 @@ extends GoraReducer<IntWritable, FetchEn
fiq.dump();
}
}
+
+ // empties the queues (used by timebomb and throughput threshold)
+ public synchronized int emptyQueues() {
+ int count = 0;
+
+ // emptying the queues
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0) continue;
+ LOG.info("* queue: " + id + " >> dropping! ");
+ int deleted = fiq.emptyQueue();
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ count += deleted;
+ }
+ // there might also be a case where totalsize !=0 but number of queues
+ // == 0
+ // in which case we simply force it to 0 to avoid blocking
+ if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+
+ return count;
+ }
}
/**
@@ -586,7 +594,7 @@ extends GoraReducer<IntWritable, FetchEn
} catch (final Throwable t) { // unexpected exception
// unblock
fetchQueues.finishFetchItem(fit);
- LOG.error(fit.url, t.toString());
+ LOG.error("Unexpected error for " + fit.url, t);
output(fit, null, ProtocolStatusUtils.STATUS_FAILED,
CrawlStatus.STATUS_RETRY);
}
@@ -750,16 +758,21 @@ extends GoraReducer<IntWritable, FetchEn
}
}
- private void reportStatus(Context context) throws IOException {
- StringBuffer status = new StringBuffer();
+ private void reportAndLogStatus(Context context, float actualPages,
+ int actualBytes, int totalSize) throws IOException {
+ StringBuilder status = new StringBuilder();
long elapsed = (System.currentTimeMillis() - start)/1000;
- status.append(spinWaiting).append("/").append(activeThreads).append(" threads spinwaiting\n");
+ status.append(spinWaiting).append("/").append(activeThreads).append(" spinwaiting/active, ");
status.append(pages).append(" pages, ").append(errors).append(" errors, ");
- status.append(Math.round(((float)pages.get()*10)/elapsed)/10.0).append(" pages/s, ");
- status.append(Math.round(((((float)bytes.get())*8)/1024)/elapsed)).append(" kb/s, ");
- status.append(this.fetchQueues.getTotalSize()).append(" URLs in ");
+ status.append(Math.round((((float)pages.get())*10)/elapsed)/10.0).append(" ");
+ status.append(Math.round(((float)actualPages)*10)/10.0).append(" pages/s, ");
+ status.append(Math.round((((float)bytes.get())*8)/1024)/elapsed).append(" ");
+ status.append(Math.round(((float)actualBytes)*8)/1024).append(" kb/s, ");
+ status.append(totalSize).append(" URLs in ");
status.append(this.fetchQueues.getQueueCount()).append(" queues");
- context.setStatus(status.toString());
+ String toString = status.toString();
+ context.setStatus(toString);
+ LOG.info(toString);
}
@Override
@@ -787,24 +800,72 @@ extends GoraReducer<IntWritable, FetchEn
// select a timeout that avoids a task timeout
final long timeout = conf.getInt("mapred.task.timeout", 10*60*1000)/2;
+ // Used for threshold check, holds pages and bytes processed in the last sec
+ float pagesLastSec;
+ int bytesLastSec;
+
+ int throughputThresholdCurrentSequence = 0;
+
+ int throughputThresholdPages = conf.getInt("fetcher.throughput.threshold.pages", -1);
+ if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
+ int throughputThresholdSequence = conf.getInt("fetcher.throughput.threshold.sequence", 5);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: throughput threshold sequence: " + throughputThresholdSequence);
+ }
+ long throughputThresholdTimeLimit = conf.getLong("fetcher.throughput.threshold.check.after", -1);
+
do { // wait for threads to exit
+ pagesLastSec = pages.get();
+ bytesLastSec = (int)bytes.get();
+ final int secondsToSleep = 5;
try {
- Thread.sleep(10000);
- } catch (final InterruptedException e) {}
+ Thread.sleep(secondsToSleep * 1000);
+ } catch (InterruptedException e) {}
- context.progress();
- reportStatus(context);
- LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
- + ", fetchQueues= " + fetchQueues.getQueueCount() +", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+ pagesLastSec = (pages.get() - pagesLastSec)/secondsToSleep;
+ bytesLastSec = ((int)bytes.get() - bytesLastSec)/secondsToSleep;
- if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+ int fetchQueuesTotalSize = fetchQueues.getTotalSize();
+ reportAndLogStatus(context, pagesLastSec, bytesLastSec, fetchQueuesTotalSize);
+
+ boolean feederAlive = feeder.isAlive();
+ if (!feederAlive && fetchQueuesTotalSize < 5) {
fetchQueues.dump();
}
// check timelimit
- if (!feeder.isAlive()) {
+ if (!feederAlive) {
int hitByTimeLimit = fetchQueues.checkTimelimit();
- if (hitByTimeLimit != 0) context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
+ if (hitByTimeLimit != 0) {
+ context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
+ }
+ }
+
+ // if throughput threshold is enabled
+ if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) {
+ // Check if we're dropping below the threshold
+ if (pagesLastSec < throughputThresholdPages) {
+ throughputThresholdCurrentSequence++;
+ LOG.warn(Integer.toString(throughputThresholdCurrentSequence)
+ + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages)
+ + " pages per second");
+
+ // Quit if we dropped below threshold too many times
+ if (throughputThresholdCurrentSequence > throughputThresholdSequence) {
+ LOG.warn("Dropped below threshold too many times in a row, killing!");
+
+ // Disable the threshold checker
+ throughputThresholdPages = -1;
+
+ // Empty the queues cleanly and get number of items that were dropped
+ int hitByThrougputThreshold = fetchQueues.emptyQueues();
+
+ if (hitByThrougputThreshold != 0) context.getCounter("FetcherStatus",
+ "hitByThrougputThreshold").increment(hitByThrougputThreshold);
+ }
+ } else {
+ throughputThresholdCurrentSequence = 0;
+ }
}
// some requests seem to hang, despite all intentions