You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/09/15 19:15:20 UTC

svn commit: r289282 - /lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java

Author: cutting
Date: Thu Sep 15 10:15:16 2005
New Revision: 289282

URL: http://svn.apache.org/viewcvs?rev=289282&view=rev
Log:
Finish even when some threads hung.  Improve status reports.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=289282&r1=289281&r2=289282&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Thu Sep 15 10:15:16 2005
@@ -59,6 +59,7 @@
   private String segmentName;
   private int activeThreads;
   private int maxRedirect;
+  private boolean done;
 
   private long start = System.currentTimeMillis(); // start time of fetcher run
 
@@ -70,6 +71,10 @@
   private boolean parsing;
 
   private class FetcherThread extends Thread {
+    public FetcherThread() {
+      this.setDaemon(true);                       // don't hang JVM on exit
+    }
+
     public void run() {
       synchronized (Fetcher.this) {activeThreads++;} // count threads
       
@@ -82,8 +87,10 @@
             break;                                // exit
           
           try {                                   // get next entry from input
-            if (!input.next(key, datum))
+            if (!input.next(key, datum)) {
+              done = true;
               break;                              // at eof, exit
+            }
           } catch (IOException e) {
             LOG.severe("fetcher caught:"+e.toString());
             break;
@@ -125,8 +132,9 @@
                 }
                 break;
 
-              case ProtocolStatus.RETRY:          // retry
               case ProtocolStatus.EXCEPTION:
+                logError(url, status.getMessage());
+              case ProtocolStatus.RETRY:          // retry
                 output(key, datum, null, CrawlDatum.STATUS_FETCH_RETRY);
                 break;
                 
@@ -152,7 +160,7 @@
 
             
           } catch (Throwable t) {                 // unexpected exception
-            logError(url, t);
+            logError(url, t.toString());
             output(key, datum, null, CrawlDatum.STATUS_FETCH_GONE);
             
           }
@@ -165,9 +173,8 @@
       }
     }
 
-    private void logError(String url, Throwable t) {
-      LOG.info("fetch of " + url + " failed with: " + t);
-      LOG.log(Level.FINE, "stack", t);            // stack trace
+    private void logError(String url, String message) {
+      LOG.info("fetch of " + url + " failed with: " + message);
       synchronized (Fetcher.this) {               // record failure
         errors++;
       }
@@ -225,19 +232,14 @@
   private synchronized void updateStatus(int bytesInPage) throws IOException {
     pages++;
     bytes += bytesInPage;
+  }
 
-    if ((pages % 100) == 0) {             // show status every 100pp
-      long elapsed = (System.currentTimeMillis() - start)/1000;
-      String line1 =
-        pages+" pages, "+errors+" errors, "+bytes+" bytes, "+elapsed+" secs";
-      String line2 = 
-        + ((float)pages)/elapsed+" pages/s, "
-        + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
-        + ((float)bytes)/pages+" bytes/page";
-      LOG.info( "status: "+line1);
-      LOG.info( "status: "+line2);
-      reporter.setStatus(line2);
-    }
+  private synchronized void reportStatus() throws IOException {
+    long elapsed = (System.currentTimeMillis() - start)/1000;
+    reporter.setStatus
+      (pages+" pages, "+errors+" errors, "
+       + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
+       + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ");
   }
 
   public void configure(JobConf job) {
@@ -266,7 +268,7 @@
     this.input = input;
     this.output = output;
     this.reporter = reporter;
-			
+
     this.maxRedirect = getConf().getInt("http.redirect.max", 3);
     
     int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
@@ -278,6 +280,24 @@
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {}
+
+      reportStatus();
+
+      // some threads seem to hang, despite all intentions
+      if (done) {                                 // last entry read
+        long doneTime = System.currentTimeMillis();
+        long timeout = getConf().getLong("http.timeout", 10000) * 10;
+        while (activeThreads > 0
+               && System.currentTimeMillis()-doneTime < timeout) {
+          try {
+            Thread.sleep(1000);                   // wait for completion
+          } catch (InterruptedException e) {}
+        }
+        if (activeThreads > 0) {                  // abort after timeout
+          LOG.warning("Aborting with "+activeThreads+" hung threads.");
+          return;
+        }
+      }
 
     } while (activeThreads > 0);