You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2009/12/01 15:50:15 UTC

svn commit: r885776 - in /lucene/nutch/trunk: conf/nutch-default.xml src/java/org/apache/nutch/fetcher/Fetcher.java

Author: ab
Date: Tue Dec  1 14:50:15 2009
New Revision: 885776

URL: http://svn.apache.org/viewvc?rev=885776&view=rev
Log:
NUTCH-770 Timebomb for Fetcher.

Modified:
    lucene/nutch/trunk/conf/nutch-default.xml
    lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java

Modified: lucene/nutch/trunk/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?rev=885776&r1=885775&r2=885776&view=diff
==============================================================================
--- lucene/nutch/trunk/conf/nutch-default.xml (original)
+++ lucene/nutch/trunk/conf/nutch-default.xml Tue Dec  1 14:50:15 2009
@@ -601,6 +601,15 @@
   <description>If true, fetcher will store content.</description>
 </property>
 
+<property>
+  <name>fetcher.timelimit.mins</name>
+  <value>-1</value>
+  <description>This is the number of minutes allocated to the fetching.
+  Once this value is reached, any remaining entry from the input URL list is skipped 
+  and all active queues are emptied. The default value of -1 deactivates the time limit.
+  </description>
+</property>
+
 <!-- indexer properties -->
 
 <property>
@@ -1277,4 +1286,14 @@
   </description>
 </property>
 
+<!-- solr index properties -->
+<property>
+  <name>solrindex.mapping.file</name>
+  <value>solrindex-mapping.xml</value>
+  <description>
+  Defines the name of the file that will be used in the mapping of internal
+  nutch field names to solr index fields as specified in the target Solr schema.
+  </description>
+</property>
+
 </configuration>

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=885776&r1=885775&r2=885776&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Tue Dec  1 14:50:15 2009
@@ -222,6 +222,12 @@
       setEndTime(System.currentTimeMillis() - crawlDelay);
     }
     
+    public synchronized int emptyQueue() {
+      int presize = queue.size();
+      queue.clear();
+      return presize;
+    }
+    
     public int getQueueSize() {
       return queue.size();
     }
@@ -299,6 +305,7 @@
     boolean byIP;
     long crawlDelay;
     long minCrawlDelay;
+    long timelimit = -1;
     Configuration conf;    
     
     public FetchItemQueues(Configuration conf) {
@@ -308,6 +315,7 @@
       this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
       this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
       this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+      this.timelimit = conf.getLong("fetcher.timelimit.mins", -1);
     }
     
     public int getTotalSize() {
@@ -371,6 +379,29 @@
       return null;
     }
     
+    // called only once the feeder has stopped
+    public synchronized int checkTimelimit() {
+      int count = 0;
+      if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+        // emptying the queues
+        for (String id : queues.keySet()) {
+          FetchItemQueue fiq = queues.get(id);
+          if (fiq.getQueueSize() == 0) continue;
+          LOG.info("* queue: " + id + " >> timelimit! ");
+          int deleted = fiq.emptyQueue();
+          for (int i = 0; i < deleted; i++) {
+            totalSize.decrementAndGet();
+          }
+          count += deleted;
+        }
+        // there might also be a case where totalsize !=0 but number of queues
+        // == 0
+        // in which case we simply force it to 0 to avoid blocking
+        if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+      }
+      return count;
+    }
+    
     public synchronized void dump() {
       for (String id : queues.keySet()) {
         FetchItemQueue fiq = queues.get(id);
@@ -389,6 +420,7 @@
     private RecordReader<Text, CrawlDatum> reader;
     private FetchItemQueues queues;
     private int size;
+    private long timelimit = -1;
     
     public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
         FetchItemQueues queues, int size) {
@@ -399,11 +431,29 @@
       this.setName("QueueFeeder");
     }
     
+    public void setTimeLimit(long tl) {
+      timelimit = tl;
+    }
+
     public void run() {
       boolean hasMore = true;
       int cnt = 0;
-      
+      int timelimitcount = 0;
       while (hasMore) {
+        if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+          // enough .. lets' simply
+          // read all the entries from the input without processing them
+          try {
+            Text url = new Text();
+            CrawlDatum datum = new CrawlDatum();
+            hasMore = reader.next(url, datum);
+            timelimitcount++;
+          } catch (IOException e) {
+            LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
+            return;
+          }
+          continue;
+        }
         int feed = size - queues.getTotalSize();
         if (feed <= 0) {
           // queues are full - spin-wait until they have some free space
@@ -430,7 +480,8 @@
           }
         }
       }
-      LOG.info("QueueFeeder finished: total " + cnt + " records.");
+      LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"
+          + timelimitcount);
     }
   }
   
@@ -899,6 +950,10 @@
 
     feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
     //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+    
+    // the value of the time limit is either -1 or the time where it should finish
+    long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+    if (timelimit != -1) feeder.setTimeLimit(timelimit);
     feeder.start();
 
     // set non-blocking & no-robots mode for HTTP protocol plugins.
@@ -924,6 +979,14 @@
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
       }
+      
+      // check timelimit
+      if (!feeder.isAlive()) {
+        int hitByTimeLimit = fetchQueues.checkTimelimit();
+        if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",
+            "hitByTimeLimit", hitByTimeLimit);
+      }
+      
       // some requests seem to hang, despite all intentions
       if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
         if (LOG.isWarnEnabled()) {
@@ -947,6 +1010,16 @@
       LOG.info("Fetcher: segment: " + segment);
     }
 
+    // set the actual time for the timelimit relative
+    // to the beginning of the whole job and not of a specific task
+    // otherwise it keeps trying again if a task fails
+    long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+    if (timelimit != -1) {
+      timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+      LOG.info("Fetcher Timelimit set for : " + timelimit);
+      getConf().setLong("fetcher.timelimit.mins", timelimit);
+    }
+        
     JobConf job = new NutchJob(getConf());
     job.setJobName("fetch " + segment);