You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by fe...@apache.org on 2012/05/07 17:29:07 UTC

svn commit: r1335063 - in /nutch/branches/nutchgora: CHANGES.txt conf/nutch-default.xml src/java/org/apache/nutch/fetcher/FetcherReducer.java

Author: ferdy
Date: Mon May  7 15:29:06 2012
New Revision: 1335063

URL: http://svn.apache.org/viewvc?rev=1335063&view=rev
Log:
NUTCH-1355 nutchgora Configure minimum throughput for fetcher

Modified:
    nutch/branches/nutchgora/CHANGES.txt
    nutch/branches/nutchgora/conf/nutch-default.xml
    nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java

Modified: nutch/branches/nutchgora/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/CHANGES.txt?rev=1335063&r1=1335062&r2=1335063&view=diff
==============================================================================
--- nutch/branches/nutchgora/CHANGES.txt (original)
+++ nutch/branches/nutchgora/CHANGES.txt Mon May  7 15:29:06 2012
@@ -1,6 +1,8 @@
 Nutch Change Log
 
 Release nutchgora - Current Development
+* NUTCH-1355 nutchgora Configure minimum throughput for fetcher
+
 * NUTCH-1354 nutchgora support fetcher.queue.depth.multiplier property (ferdy)
 
 * NUTCH-1353 nutchgora DomainStatistics support crawlId, counter bug and reformatting (ferdy)

Modified: nutch/branches/nutchgora/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/conf/nutch-default.xml?rev=1335063&r1=1335062&r2=1335063&view=diff
==============================================================================
--- nutch/branches/nutchgora/conf/nutch-default.xml (original)
+++ nutch/branches/nutchgora/conf/nutch-default.xml Mon May  7 15:29:06 2012
@@ -687,6 +687,30 @@
 </property>
 
 <property>
+  <name>fetcher.throughput.threshold.pages</name>
+  <value>-1</value>
+  <description>The threshold of minimum pages per second. If the fetcher downloads less
+  pages per second than the configured threshold, the fetcher stops, preventing slow queue's
+  from stalling the throughput. This threshold must be an integer. This can be useful when
+  fetcher.timelimit.mins is hard to determine. The default value of -1 disables this check.
+  </description>
+</property>
+
+<property>
+  <name>fetcher.throughput.threshold.sequence</name>
+  <value>5</value>
+  <description>The number of times the fetcher.throughput.threshold is allowed to be exceeded,
+  in a row. This setting prevents accidental slow downs from stopping the fetcher.
+  </description>
+</property>
+
+<property>
+  <name>fetcher.throughput.threshold.check.after</name>
+  <value>5</value>
+  <description>The number of minutes after which the throughput check is enabled.</description>
+</property>
+
+<property>
   <name>fetcher.queue.depth.multiplier</name>
   <value>50</value>
   <description>(EXPERT)The fetcher buffers the incoming URLs into queues based on the [host|domain|IP]

Modified: nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=1335063&r1=1335062&r2=1335063&view=diff
==============================================================================
--- nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java (original)
+++ nutch/branches/nutchgora/src/java/org/apache/nutch/fetcher/FetcherReducer.java Mon May  7 15:29:06 2012
@@ -380,25 +380,10 @@ extends GoraReducer<IntWritable, FetchEn
     }
     
     public synchronized int checkTimelimit() {
-      int count = 0;
       if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
-        // emptying the queues
-        for (String id : queues.keySet()) {
-          FetchItemQueue fiq = queues.get(id);
-          if (fiq.getQueueSize() == 0) continue;
-          LOG.info("* queue: " + id + " >> timelimit! ");
-          int deleted = fiq.emptyQueue();
-          for (int i = 0; i < deleted; i++) {
-            totalSize.decrementAndGet();
-          }
-          count += deleted;
-        }
-        // there might also be a case where totalsize !=0 but number of queues
-        // == 0
-        // in which case we simply force it to 0 to avoid blocking
-        if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+        return emptyQueues();
       }
-      return count;
+      return 0;
     }
     
 
@@ -410,6 +395,29 @@ extends GoraReducer<IntWritable, FetchEn
         fiq.dump();
       }
     }
+
+    // empties the queues (used by timebomb and throughput threshold)
+    public synchronized int emptyQueues() {
+      int count = 0;
+      
+      // emptying the queues
+      for (String id : queues.keySet()) {
+        FetchItemQueue fiq = queues.get(id);
+        if (fiq.getQueueSize() == 0) continue;
+        LOG.info("* queue: " + id + " >> dropping! ");
+        int deleted = fiq.emptyQueue();
+        for (int i = 0; i < deleted; i++) {
+          totalSize.decrementAndGet();
+        }
+        count += deleted;
+      }
+      // there might also be a case where totalsize !=0 but number of queues
+      // == 0
+      // in which case we simply force it to 0 to avoid blocking
+      if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+
+      return count;
+    }
   }
 
   /**
@@ -586,7 +594,7 @@ extends GoraReducer<IntWritable, FetchEn
           } catch (final Throwable t) {                 // unexpected exception
             // unblock
             fetchQueues.finishFetchItem(fit);
-            LOG.error(fit.url, t.toString());
+            LOG.error("Unexpected error for " + fit.url, t);
             output(fit, null, ProtocolStatusUtils.STATUS_FAILED,
                 CrawlStatus.STATUS_RETRY);
           }
@@ -750,16 +758,21 @@ extends GoraReducer<IntWritable, FetchEn
     }
   }
 
-  private void reportStatus(Context context) throws IOException {
-    StringBuffer status = new StringBuffer();
+  private void reportAndLogStatus(Context context, float actualPages, 
+      int actualBytes, int totalSize) throws IOException {
+    StringBuilder status = new StringBuilder();
     long elapsed = (System.currentTimeMillis() - start)/1000;
-    status.append(spinWaiting).append("/").append(activeThreads).append(" threads spinwaiting\n");
+    status.append(spinWaiting).append("/").append(activeThreads).append(" spinwaiting/active, ");
     status.append(pages).append(" pages, ").append(errors).append(" errors, ");
-    status.append(Math.round(((float)pages.get()*10)/elapsed)/10.0).append(" pages/s, ");
-    status.append(Math.round(((((float)bytes.get())*8)/1024)/elapsed)).append(" kb/s, ");
-    status.append(this.fetchQueues.getTotalSize()).append(" URLs in ");
+    status.append(Math.round((((float)pages.get())*10)/elapsed)/10.0).append(" ");
+    status.append(Math.round(((float)actualPages)*10)/10.0).append(" pages/s, ");
+    status.append(Math.round((((float)bytes.get())*8)/1024)/elapsed).append(" ");
+    status.append(Math.round(((float)actualBytes)*8)/1024).append(" kb/s, ");
+    status.append(totalSize).append(" URLs in ");
     status.append(this.fetchQueues.getQueueCount()).append(" queues");
-    context.setStatus(status.toString());
+    String toString = status.toString();
+    context.setStatus(toString);
+    LOG.info(toString);
   }
 
   @Override
@@ -787,24 +800,72 @@ extends GoraReducer<IntWritable, FetchEn
     // select a timeout that avoids a task timeout
     final long timeout = conf.getInt("mapred.task.timeout", 10*60*1000)/2;
 
+    // Used for threshold check, holds pages and bytes processed in the last sec
+    float pagesLastSec;
+    int bytesLastSec;
+
+    int throughputThresholdCurrentSequence = 0;
+
+    int throughputThresholdPages = conf.getInt("fetcher.throughput.threshold.pages", -1);
+    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
+    int throughputThresholdSequence = conf.getInt("fetcher.throughput.threshold.sequence", 5);
+    if (LOG.isInfoEnabled()) { 
+      LOG.info("Fetcher: throughput threshold sequence: " + throughputThresholdSequence); 
+    }
+    long throughputThresholdTimeLimit = conf.getLong("fetcher.throughput.threshold.check.after", -1);
+    
     do {                                          // wait for threads to exit
+      pagesLastSec = pages.get();
+      bytesLastSec = (int)bytes.get();
+      final int secondsToSleep = 5;
       try {
-        Thread.sleep(10000);
-      } catch (final InterruptedException e) {}
+        Thread.sleep(secondsToSleep * 1000);
+      } catch (InterruptedException e) {}
 
-      context.progress();
-      reportStatus(context);
-      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
-          + ", fetchQueues= " + fetchQueues.getQueueCount() +", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+      pagesLastSec = (pages.get() - pagesLastSec)/secondsToSleep;
+      bytesLastSec = ((int)bytes.get() - bytesLastSec)/secondsToSleep;
 
-      if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+      int fetchQueuesTotalSize = fetchQueues.getTotalSize();
+      reportAndLogStatus(context, pagesLastSec, bytesLastSec, fetchQueuesTotalSize);
+      
+      boolean feederAlive = feeder.isAlive();
+      if (!feederAlive && fetchQueuesTotalSize < 5) {
         fetchQueues.dump();
       }
       
       // check timelimit
-      if (!feeder.isAlive()) {
+      if (!feederAlive) {
         int hitByTimeLimit = fetchQueues.checkTimelimit();
-        if (hitByTimeLimit != 0) context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
+        if (hitByTimeLimit != 0) {
+          context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
+        }
+      }
+      
+      // if throughput threshold is enabled
+      if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) {
+        // Check if we're dropping below the threshold
+        if (pagesLastSec < throughputThresholdPages) {
+          throughputThresholdCurrentSequence++;
+          LOG.warn(Integer.toString(throughputThresholdCurrentSequence) 
+              + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) 
+              + " pages per second");
+
+          // Quit if we dropped below threshold too many times
+          if (throughputThresholdCurrentSequence > throughputThresholdSequence) {
+            LOG.warn("Dropped below threshold too many times in a row, killing!");
+
+            // Disable the threshold checker
+            throughputThresholdPages = -1;
+
+            // Empty the queues cleanly and get number of items that were dropped
+            int hitByThrougputThreshold = fetchQueues.emptyQueues();
+
+            if (hitByThrougputThreshold != 0) context.getCounter("FetcherStatus", 
+                "hitByThrougputThreshold").increment(hitByThrougputThreshold);
+          }
+        } else {
+          throughputThresholdCurrentSequence = 0;
+        }
       }
       
       // some requests seem to hang, despite all intentions