You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2009/12/01 15:50:15 UTC
svn commit: r885776 - in /lucene/nutch/trunk: conf/nutch-default.xml
src/java/org/apache/nutch/fetcher/Fetcher.java
Author: ab
Date: Tue Dec 1 14:50:15 2009
New Revision: 885776
URL: http://svn.apache.org/viewvc?rev=885776&view=rev
Log:
NUTCH-770 Timebomb for Fetcher.
Modified:
lucene/nutch/trunk/conf/nutch-default.xml
lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
Modified: lucene/nutch/trunk/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=885776&r1=885775&r2=885776&view=diff
==============================================================================
--- lucene/nutch/trunk/conf/nutch-default.xml (original)
+++ lucene/nutch/trunk/conf/nutch-default.xml Tue Dec 1 14:50:15 2009
@@ -601,6 +601,15 @@
<description>If true, fetcher will store content.</description>
</property>
+<property>
+ <name>fetcher.timelimit.mins</name>
+ <value>-1</value>
+ <description>This is the number of minutes allocated to the fetching.
+ Once this value is reached, any remaining entry from the input URL list is skipped
+ and all active queues are emptied. The default value of -1 deactivates the time limit.
+ </description>
+</property>
+
<!-- indexer properties -->
<property>
@@ -1277,4 +1286,14 @@
</description>
</property>
+<!-- solr index properties -->
+<property>
+ <name>solrindex.mapping.file</name>
+ <value>solrindex-mapping.xml</value>
+ <description>
+ Defines the name of the file that will be used in the mapping of internal
+ nutch field names to solr index fields as specified in the target Solr schema.
+ </description>
+</property>
+
</configuration>
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=885776&r1=885775&r2=885776&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Tue Dec 1 14:50:15 2009
@@ -222,6 +222,12 @@
setEndTime(System.currentTimeMillis() - crawlDelay);
}
+ public synchronized int emptyQueue() {
+ int presize = queue.size();
+ queue.clear();
+ return presize;
+ }
+
public int getQueueSize() {
return queue.size();
}
@@ -299,6 +305,7 @@
boolean byIP;
long crawlDelay;
long minCrawlDelay;
+ long timelimit = -1;
Configuration conf;
public FetchItemQueues(Configuration conf) {
@@ -308,6 +315,7 @@
this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+ this.timelimit = conf.getLong("fetcher.timelimit.mins", -1);
}
public int getTotalSize() {
@@ -371,6 +379,29 @@
return null;
}
+ // called only once the feeder has stopped
+ public synchronized int checkTimelimit() {
+ int count = 0;
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // emptying the queues
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0) continue;
+ LOG.info("* queue: " + id + " >> timelimit! ");
+ int deleted = fiq.emptyQueue();
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ count += deleted;
+ }
+ // there might also be a case where totalsize !=0 but number of queues
+ // == 0
+ // in which case we simply force it to 0 to avoid blocking
+ if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+ }
+ return count;
+ }
+
public synchronized void dump() {
for (String id : queues.keySet()) {
FetchItemQueue fiq = queues.get(id);
@@ -389,6 +420,7 @@
private RecordReader<Text, CrawlDatum> reader;
private FetchItemQueues queues;
private int size;
+ private long timelimit = -1;
public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
FetchItemQueues queues, int size) {
@@ -399,11 +431,29 @@
this.setName("QueueFeeder");
}
+ public void setTimeLimit(long tl) {
+ timelimit = tl;
+ }
+
public void run() {
boolean hasMore = true;
int cnt = 0;
-
+ int timelimitcount = 0;
while (hasMore) {
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // enough .. lets' simply
+ // read all the entries from the input without processing them
+ try {
+ Text url = new Text();
+ CrawlDatum datum = new CrawlDatum();
+ hasMore = reader.next(url, datum);
+ timelimitcount++;
+ } catch (IOException e) {
+ LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
+ return;
+ }
+ continue;
+ }
int feed = size - queues.getTotalSize();
if (feed <= 0) {
// queues are full - spin-wait until they have some free space
@@ -430,7 +480,8 @@
}
}
}
- LOG.info("QueueFeeder finished: total " + cnt + " records.");
+ LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"
+ + timelimitcount);
}
}
@@ -899,6 +950,10 @@
feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
//feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+
+ // the value of the time limit is either -1 or the time where it should finish
+ long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+ if (timelimit != -1) feeder.setTimeLimit(timelimit);
feeder.start();
// set non-blocking & no-robots mode for HTTP protocol plugins.
@@ -924,6 +979,14 @@
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
+
+ // check timelimit
+ if (!feeder.isAlive()) {
+ int hitByTimeLimit = fetchQueues.checkTimelimit();
+ if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",
+ "hitByTimeLimit", hitByTimeLimit);
+ }
+
// some requests seem to hang, despite all intentions
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
if (LOG.isWarnEnabled()) {
@@ -947,6 +1010,16 @@
LOG.info("Fetcher: segment: " + segment);
}
+ // set the actual time for the timelimit relative
+ // to the beginning of the whole job and not of a specific task
+ // otherwise it keeps trying again if a task fails
+ long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+ if (timelimit != -1) {
+ timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+ LOG.info("Fetcher Timelimit set for : " + timelimit);
+ getConf().setLong("fetcher.timelimit.mins", timelimit);
+ }
+
JobConf job = new NutchJob(getConf());
job.setJobName("fetch " + segment);