You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/09/15 19:15:20 UTC
svn commit: r289282 -
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
Author: cutting
Date: Thu Sep 15 10:15:16 2005
New Revision: 289282
URL: http://svn.apache.org/viewcvs?rev=289282&view=rev
Log:
Finish even when some threads hung. Improve status reports.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java?rev=289282&r1=289281&r2=289282&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/crawl/Fetcher.java Thu Sep 15 10:15:16 2005
@@ -59,6 +59,7 @@
private String segmentName;
private int activeThreads;
private int maxRedirect;
+ private boolean done;
private long start = System.currentTimeMillis(); // start time of fetcher run
@@ -70,6 +71,10 @@
private boolean parsing;
private class FetcherThread extends Thread {
+ public FetcherThread() {
+ this.setDaemon(true); // don't hang JVM on exit
+ }
+
public void run() {
synchronized (Fetcher.this) {activeThreads++;} // count threads
@@ -82,8 +87,10 @@
break; // exit
try { // get next entry from input
- if (!input.next(key, datum))
+ if (!input.next(key, datum)) {
+ done = true;
break; // at eof, exit
+ }
} catch (IOException e) {
LOG.severe("fetcher caught:"+e.toString());
break;
@@ -125,8 +132,9 @@
}
break;
- case ProtocolStatus.RETRY: // retry
case ProtocolStatus.EXCEPTION:
+ logError(url, status.getMessage());
+ case ProtocolStatus.RETRY: // retry
output(key, datum, null, CrawlDatum.STATUS_FETCH_RETRY);
break;
@@ -152,7 +160,7 @@
} catch (Throwable t) { // unexpected exception
- logError(url, t);
+ logError(url, t.toString());
output(key, datum, null, CrawlDatum.STATUS_FETCH_GONE);
}
@@ -165,9 +173,8 @@
}
}
- private void logError(String url, Throwable t) {
- LOG.info("fetch of " + url + " failed with: " + t);
- LOG.log(Level.FINE, "stack", t); // stack trace
+ private void logError(String url, String message) {
+ LOG.info("fetch of " + url + " failed with: " + message);
synchronized (Fetcher.this) { // record failure
errors++;
}
@@ -225,19 +232,14 @@
private synchronized void updateStatus(int bytesInPage) throws IOException {
pages++;
bytes += bytesInPage;
+ }
- if ((pages % 100) == 0) { // show status every 100pp
- long elapsed = (System.currentTimeMillis() - start)/1000;
- String line1 =
- pages+" pages, "+errors+" errors, "+bytes+" bytes, "+elapsed+" secs";
- String line2 =
- + ((float)pages)/elapsed+" pages/s, "
- + ((((float)bytes)*8)/1024)/elapsed+" kb/s, "
- + ((float)bytes)/pages+" bytes/page";
- LOG.info( "status: "+line1);
- LOG.info( "status: "+line2);
- reporter.setStatus(line2);
- }
+ private synchronized void reportStatus() throws IOException {
+ long elapsed = (System.currentTimeMillis() - start)/1000;
+ reporter.setStatus
+ (pages+" pages, "+errors+" errors, "
+ + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
+ + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ");
}
public void configure(JobConf job) {
@@ -266,7 +268,7 @@
this.input = input;
this.output = output;
this.reporter = reporter;
-
+
this.maxRedirect = getConf().getInt("http.redirect.max", 3);
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
@@ -278,6 +280,24 @@
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
+
+ reportStatus();
+
+ // some threads seem to hang, despite all intentions
+ if (done) { // last entry read
+ long doneTime = System.currentTimeMillis();
+ long timeout = getConf().getLong("http.timeout", 10000) * 10;
+ while (activeThreads > 0
+ && System.currentTimeMillis()-doneTime < timeout) {
+ try {
+ Thread.sleep(1000); // wait for completion
+ } catch (InterruptedException e) {}
+ }
+ if (activeThreads > 0) { // abort after timeout
+ LOG.warning("Aborting with "+activeThreads+" hung threads.");
+ return;
+ }
+ }
} while (activeThreads > 0);