You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2015/01/29 06:39:03 UTC

svn commit: r1655526 [4/26] - in /nutch/trunk: ./ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/metadata/ src/java/org/apache/nutch/net/ src/java/org/apache/nutch/net/pr...

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Thu Jan 29 05:38:59 2015
@@ -55,37 +55,40 @@ import crawlercommons.robots.BaseRobotRu
 
 /**
  * A queue-based fetcher.
- *
- * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
- * and many consumers (FetcherThread-s).
- *
- * <p>QueueFeeder reads input fetchlists and
- * populates a set of FetchItemQueue-s, which hold FetchItem-s that
- * describe the items to be fetched. There are as many queues as there are unique
- * hosts, but at any given time the total number of fetch items in all queues
- * is less than a fixed number (currently set to a multiple of the number of
- * threads).
- *
- * <p>As items are consumed from the queues, the QueueFeeder continues to add new
+ * 
+ * <p>
+ * This fetcher uses a well-known model of one producer (a QueueFeeder) and many
+ * consumers (FetcherThread-s).
+ * 
+ * <p>
+ * QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
+ * which hold FetchItem-s that describe the items to be fetched. There are as
+ * many queues as there are unique hosts, but at any given time the total number
+ * of fetch items in all queues is less than a fixed number (currently set to a
+ * multiple of the number of threads).
+ * 
+ * <p>
+ * As items are consumed from the queues, the QueueFeeder continues to add new
  * input items, so that their total count stays fixed (FetcherThread-s may also
  * add new items to the queues e.g. as a results of redirection) - until all
  * input items are exhausted, at which point the number of items in the queues
  * begins to decrease. When this number reaches 0 fetcher will finish.
- *
- * <p>This fetcher implementation handles per-host blocking itself, instead
- * of delegating this work to protocol-specific plugins.
- * Each per-host queue handles its own "politeness" settings, such as the
- * maximum number of concurrent requests and crawl delay between consecutive
- * requests - and also a list of requests in progress, and the time the last
- * request was finished. As FetcherThread-s ask for new items to be fetched,
- * queues may return eligible items or null if for "politeness" reasons this
- * host's queue is not yet ready.
- *
- * <p>If there are still unfetched items in the queues, but none of the items
- * are ready, FetcherThread-s will spin-wait until either some items become
+ * 
+ * <p>
+ * This fetcher implementation handles per-host blocking itself, instead of
+ * delegating this work to protocol-specific plugins. Each per-host queue
+ * handles its own "politeness" settings, such as the maximum number of
+ * concurrent requests and crawl delay between consecutive requests - and also a
+ * list of requests in progress, and the time the last request was finished. As
+ * FetcherThread-s ask for new items to be fetched, queues may return eligible
+ * items or null if for "politeness" reasons this host's queue is not yet ready.
+ * 
+ * <p>
+ * If there are still unfetched items in the queues, but none of the items are
+ * ready, FetcherThread-s will spin-wait until either some items become
  * available, or a timeout is reached (at which point the Fetcher will abort,
  * assuming the task is hung).
- *
+ * 
  * @author Andrzej Bialecki
  */
 public class Fetcher extends Configured implements Tool,
@@ -99,16 +102,16 @@ public class Fetcher extends Configured
 
   public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
 
-  public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> {
+  public static class InputFormat extends
+      SequenceFileInputFormat<Text, CrawlDatum> {
     /** Don't split inputs, to keep things polite. */
-    public InputSplit[] getSplits(JobConf job, int nSplits)
-      throws IOException {
+    public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
       FileStatus[] files = listStatus(job);
       FileSplit[] splits = new FileSplit[files.length];
       for (int i = 0; i < files.length; i++) {
         FileStatus cur = files[i];
-        splits[i] = new FileSplit(cur.getPath(), 0,
-            cur.getLen(), (String[])null);
+        splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+            (String[]) null);
       }
       return splits;
     }
@@ -124,8 +127,8 @@ public class Fetcher extends Configured
   private long start = System.currentTimeMillis(); // start time of fetcher run
   private AtomicLong lastRequestStart = new AtomicLong(start);
 
-  private AtomicLong bytes = new AtomicLong(0);        // total bytes fetched
-  private AtomicInteger pages = new AtomicInteger(0);  // total pages fetched
+  private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
+  private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
   private AtomicInteger errors = new AtomicInteger(0); // total pages errored
 
   private boolean storingContent;
@@ -149,7 +152,8 @@ public class Fetcher extends Configured
       this(url, u, datum, queueID, 0);
     }
 
-    public FetchItem(Text url, URL u, CrawlDatum datum, String queueID, int outlinkDepth) {
+    public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
+        int outlinkDepth) {
       this.url = url;
       this.u = u;
       this.datum = datum;
@@ -157,15 +161,17 @@ public class Fetcher extends Configured
       this.outlinkDepth = outlinkDepth;
     }
 
-    /** Create an item. Queue id will be created based on <code>queueMode</code>
-     * argument, either as a protocol + hostname pair, protocol + IP
-     * address pair or protocol+domain pair.
+    /**
+     * Create an item. Queue id will be created based on <code>queueMode</code>
+     * argument, either as a protocol + hostname pair, protocol + IP address
+     * pair or protocol+domain pair.
      */
-    public static FetchItem create(Text url, CrawlDatum datum,  String queueMode) {
+    public static FetchItem create(Text url, CrawlDatum datum, String queueMode) {
       return create(url, datum, queueMode, 0);
     }
 
-    public static FetchItem create(Text url, CrawlDatum datum,  String queueMode, int outlinkDepth) {
+    public static FetchItem create(Text url, CrawlDatum datum,
+        String queueMode, int outlinkDepth) {
       String queueID;
       URL u = null;
       try {
@@ -185,19 +191,18 @@ public class Fetcher extends Configured
           LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
           return null;
         }
-      }
-      else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){
+      } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
         key = URLUtil.getDomainName(u);
         if (key == null) {
-          LOG.warn("Unknown domain for url: " + url + ", using URL string as key");
-          key=u.toExternalForm();
+          LOG.warn("Unknown domain for url: " + url
+              + ", using URL string as key");
+          key = u.toExternalForm();
         }
-      }
-      else {
+      } else {
         key = u.getHost();
         if (key == null) {
           LOG.warn("Unknown host for url: " + url + ", using URL string as key");
-          key=u.toExternalForm();
+          key = u.toExternalForm();
         }
       }
       queueID = proto + "://" + key.toLowerCase();
@@ -222,13 +227,14 @@ public class Fetcher extends Configured
   }
 
   /**
-   * This class handles FetchItems which come from the same host ID (be it
-   * a proto/hostname or proto/IP pair). It also keeps track of requests in
+   * This class handles FetchItems which come from the same host ID (be it a
+   * proto/hostname or proto/IP pair). It also keeps track of requests in
    * progress and elapsed time between requests.
    */
   private static class FetchItemQueue {
-    List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
-    AtomicInteger  inProgress = new AtomicInteger();
+    List<FetchItem> queue = Collections
+        .synchronizedList(new LinkedList<FetchItem>());
+    AtomicInteger inProgress = new AtomicInteger();
     AtomicLong nextFetchTime = new AtomicLong();
     AtomicInteger exceptionCounter = new AtomicInteger();
     long crawlDelay;
@@ -236,7 +242,8 @@ public class Fetcher extends Configured
     int maxThreads;
     Configuration conf;
 
-    public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
+    public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
+        long minCrawlDelay) {
       this.conf = conf;
       this.maxThreads = maxThreads;
       this.crawlDelay = crawlDelay;
@@ -271,26 +278,33 @@ public class Fetcher extends Configured
     }
 
     public void addFetchItem(FetchItem it) {
-      if (it == null) return;
+      if (it == null)
+        return;
       queue.add(it);
     }
 
     public void addInProgressFetchItem(FetchItem it) {
-      if (it == null) return;
+      if (it == null)
+        return;
       inProgress.incrementAndGet();
     }
 
     public FetchItem getFetchItem() {
-      if (inProgress.get() >= maxThreads) return null;
+      if (inProgress.get() >= maxThreads)
+        return null;
       long now = System.currentTimeMillis();
-      if (nextFetchTime.get() > now) return null;
+      if (nextFetchTime.get() > now)
+        return null;
       FetchItem it = null;
-      if (queue.size() == 0) return null;
+      if (queue.size() == 0)
+        return null;
       try {
         it = queue.remove(0);
         inProgress.incrementAndGet();
       } catch (Exception e) {
-        LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
+        LOG.error(
+            "Cannot remove FetchItem from queue or cannot add it to inProgress queue",
+            e);
       }
       return it;
     }
@@ -314,7 +328,8 @@ public class Fetcher extends Configured
 
     private void setEndTime(long endTime, boolean asap) {
       if (!asap)
-        nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+        nextFetchTime.set(endTime
+            + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
       else
         nextFetchTime.set(endTime);
     }
@@ -346,17 +361,21 @@ public class Fetcher extends Configured
       this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
       queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
       // check that the mode is known
-      if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN)
+      if (!queueMode.equals(QUEUE_MODE_IP)
+          && !queueMode.equals(QUEUE_MODE_DOMAIN)
           && !queueMode.equals(QUEUE_MODE_HOST)) {
-        LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
+        LOG.error("Unknown partition mode : " + queueMode
+            + " - forcing to byHost");
         queueMode = QUEUE_MODE_HOST;
       }
-      LOG.info("Using queue mode : "+queueMode);
+      LOG.info("Using queue mode : " + queueMode);
 
       this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
-      this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+      this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
+          0.0f) * 1000);
       this.timelimit = conf.getLong("fetcher.timelimit", -1);
-      this.maxExceptionsPerQueue = conf.getInt("fetcher.max.exceptions.per.queue", -1);
+      this.maxExceptionsPerQueue = conf.getInt(
+          "fetcher.max.exceptions.per.queue", -1);
     }
 
     public int getTotalSize() {
@@ -369,7 +388,8 @@ public class Fetcher extends Configured
 
     public void addFetchItem(Text url, CrawlDatum datum) {
       FetchItem it = FetchItem.create(url, datum, queueMode);
-      if (it != null) addFetchItem(it);
+      if (it != null)
+        addFetchItem(it);
     }
 
     public synchronized void addFetchItem(FetchItem it) {
@@ -402,8 +422,8 @@ public class Fetcher extends Configured
     }
 
     public synchronized FetchItem getFetchItem() {
-      Iterator<Map.Entry<String, FetchItemQueue>> it =
-        queues.entrySet().iterator();
+      Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
+          .iterator();
       while (it.hasNext()) {
         FetchItemQueue fiq = it.next().getValue();
         // reap empty queues
@@ -431,7 +451,8 @@ public class Fetcher extends Configured
         // there might also be a case where totalsize !=0 but number of queues
         // == 0
         // in which case we simply force it to 0 to avoid blocking
-        if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+        if (totalSize.get() != 0 && queues.size() == 0)
+          totalSize.set(0);
       }
       return count;
     }
@@ -442,7 +463,8 @@ public class Fetcher extends Configured
 
       for (String id : queues.keySet()) {
         FetchItemQueue fiq = queues.get(id);
-        if (fiq.getQueueSize() == 0) continue;
+        if (fiq.getQueueSize() == 0)
+          continue;
         LOG.info("* queue: " + id + " >> dropping! ");
         int deleted = fiq.emptyQueue();
         for (int i = 0; i < deleted; i++) {
@@ -457,7 +479,7 @@ public class Fetcher extends Configured
     /**
      * Increment the exception counter of a queue in case of an exception e.g.
      * timeout; when higher than a given threshold simply empty the queue.
-     *
+     * 
      * @param queueid
      * @return number of purged items
      */
@@ -470,7 +492,7 @@ public class Fetcher extends Configured
         return 0;
       }
       int excCount = fiq.incrementExceptionCounter();
-      if (maxExceptionsPerQueue!= -1 && excCount >= maxExceptionsPerQueue) {
+      if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
         // too many exceptions for items in this queue - purge it
         int deleted = fiq.emptyQueue();
         LOG.info("* queue: " + queueid + " >> removed " + deleted
@@ -483,11 +505,11 @@ public class Fetcher extends Configured
       return 0;
     }
 
-
     public synchronized void dump() {
       for (String id : queues.keySet()) {
         FetchItemQueue fiq = queues.get(id);
-        if (fiq.getQueueSize() == 0) continue;
+        if (fiq.getQueueSize() == 0)
+          continue;
         LOG.info("* queue: " + id);
         fiq.dump();
       }
@@ -495,8 +517,8 @@ public class Fetcher extends Configured
   }
 
   /**
-   * This class feeds the queues with input items, and re-fills them as
-   * items are consumed by FetcherThread-s.
+   * This class feeds the queues with input items, and re-fills them as items
+   * are consumed by FetcherThread-s.
    */
   private static class QueueFeeder extends Thread {
     private RecordReader<Text, CrawlDatum> reader;
@@ -541,7 +563,9 @@ public class Fetcher extends Configured
           // queues are full - spin-wait until they have some free space
           try {
             Thread.sleep(1000);
-          } catch (Exception e) {};
+          } catch (Exception e) {
+          }
+          ;
           continue;
         } else {
           LOG.debug("-feeding " + feed + " input urls ...");
@@ -562,8 +586,8 @@ public class Fetcher extends Configured
           }
         }
       }
-      LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :"
-          + timelimitcount);
+      LOG.info("QueueFeeder finished: total " + cnt
+          + " records + hit by time limit :" + timelimitcount);
     }
   }
 
@@ -595,12 +619,12 @@ public class Fetcher extends Configured
 
     private int outlinksDepthDivisor;
     private boolean skipTruncated;
-    
+
     private boolean halted = false;
 
     public FetcherThread(Configuration conf) {
-      this.setDaemon(true);                       // don't hang JVM on exit
-      this.setName("FetcherThread");              // use an informative name
+      this.setDaemon(true); // don't hang JVM on exit
+      this.setName("FetcherThread"); // use an informative name
       this.conf = conf;
       this.urlFilters = new URLFilters(conf);
       this.scfilters = new ScoringFilters(conf);
@@ -609,26 +633,33 @@ public class Fetcher extends Configured
       this.protocolFactory = new ProtocolFactory(conf);
       this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
       this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
-      queueMode = conf.get("fetcher.queue.mode", FetchItemQueues.QUEUE_MODE_HOST);
+      queueMode = conf.get("fetcher.queue.mode",
+          FetchItemQueues.QUEUE_MODE_HOST);
       // check that the mode is known
-      if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP) && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
+      if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
+          && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
           && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
-        LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
+        LOG.error("Unknown partition mode : " + queueMode
+            + " - forcing to byHost");
         queueMode = FetchItemQueues.QUEUE_MODE_HOST;
       }
-      LOG.info("Using queue mode : "+queueMode);
+      LOG.info("Using queue mode : " + queueMode);
       this.maxRedirect = conf.getInt("http.redirect.max", 3);
-      this.ignoreExternalLinks =
-        conf.getBoolean("db.ignore.external.links", false);
+      this.ignoreExternalLinks = conf.getBoolean("db.ignore.external.links",
+          false);
 
       maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
-      maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE : maxOutlinksPerPage;
+      maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
+          : maxOutlinksPerPage;
       interval = conf.getInt("db.fetch.interval.default", 2592000);
       ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
       maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
-      outlinksIgnoreExternal = conf.getBoolean("fetcher.follow.outlinks.ignore.external", false);
-      maxOutlinkDepthNumLinks = conf.getInt("fetcher.follow.outlinks.num.links", 4);
-      outlinksDepthDivisor = conf.getInt("fetcher.follow.outlinks.depth.divisor", 2);
+      outlinksIgnoreExternal = conf.getBoolean(
+          "fetcher.follow.outlinks.ignore.external", false);
+      maxOutlinkDepthNumLinks = conf.getInt(
+          "fetcher.follow.outlinks.num.links", 4);
+      outlinksDepthDivisor = conf.getInt(
+          "fetcher.follow.outlinks.depth.divisor", 2);
     }
 
     @SuppressWarnings("fallthrough")
@@ -645,7 +676,7 @@ public class Fetcher extends Configured
             fit = null;
             return;
           }
-          
+
           fit = fetchQueues.getFetchItem();
           if (fit == null) {
             if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
@@ -654,8 +685,9 @@ public class Fetcher extends Configured
               spinWaiting.incrementAndGet();
               try {
                 Thread.sleep(500);
-              } catch (Exception e) {}
-                spinWaiting.decrementAndGet();
+              } catch (Exception e) {
+              }
+              spinWaiting.decrementAndGet();
               continue;
             } else {
               // all done, finish this thread
@@ -664,8 +696,8 @@ public class Fetcher extends Configured
             }
           }
           lastRequestStart.set(System.currentTimeMillis());
-          Text reprUrlWritable =
-            (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+          Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
+              Nutch.WRITABLE_REPR_URL_KEY);
           if (reprUrlWritable == null) {
             reprUrl = fit.url.toString();
           } else {
@@ -677,14 +709,16 @@ public class Fetcher extends Configured
             redirectCount = 0;
             do {
               if (LOG.isInfoEnabled()) {
-                LOG.info("fetching " + fit.url + " (queue crawl delay=" + 
-                         fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay + "ms)"); 
+                LOG.info("fetching " + fit.url + " (queue crawl delay="
+                    + fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay
+                    + "ms)");
               }
               if (LOG.isDebugEnabled()) {
                 LOG.debug("redirectCount=" + redirectCount);
               }
               redirecting = false;
-              Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
+              Protocol protocol = this.protocolFactory.getProtocol(fit.url
+                  .toString());
               BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
               if (!rules.isAllowed(fit.u.toString())) {
                 // unblock
@@ -692,7 +726,9 @@ public class Fetcher extends Configured
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("Denied by robots.txt: " + fit.url);
                 }
-                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
+                output(fit.url, fit.datum, null,
+                    ProtocolStatus.STATUS_ROBOTS_DENIED,
+                    CrawlDatum.STATUS_FETCH_GONE);
                 reporter.incrCounter("FetcherStatus", "robots_denied", 1);
                 continue;
               }
@@ -700,19 +736,27 @@ public class Fetcher extends Configured
                 if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
                   // unblock
                   fetchQueues.finishFetchItem(fit, true);
-                  LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
-                  output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
-                  reporter.incrCounter("FetcherStatus", "robots_denied_maxcrawldelay", 1);
+                  LOG.debug("Crawl-Delay for " + fit.url + " too long ("
+                      + rules.getCrawlDelay() + "), skipping");
+                  output(fit.url, fit.datum, null,
+                      ProtocolStatus.STATUS_ROBOTS_DENIED,
+                      CrawlDatum.STATUS_FETCH_GONE);
+                  reporter.incrCounter("FetcherStatus",
+                      "robots_denied_maxcrawldelay", 1);
                   continue;
                 } else {
-                  FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+                  FetchItemQueue fiq = fetchQueues
+                      .getFetchItemQueue(fit.queueID);
                   fiq.crawlDelay = rules.getCrawlDelay();
                   if (LOG.isDebugEnabled()) {
-                    LOG.info("Crawl delay for queue: " + fit.queueID + " is set to " + fiq.crawlDelay + " as per robots.txt. url: " + fit.url);
+                    LOG.info("Crawl delay for queue: " + fit.queueID
+                        + " is set to " + fiq.crawlDelay
+                        + " as per robots.txt. url: " + fit.url);
                   }
                 }
               }
-              ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
+              ProtocolOutput output = protocol.getProtocolOutput(fit.url,
+                  fit.datum);
               ProtocolStatus status = output.getStatus();
               Content content = output.getContent();
               ParseStatus pstatus = null;
@@ -723,32 +767,31 @@ public class Fetcher extends Configured
 
               reporter.incrCounter("FetcherStatus", status.getName(), 1);
 
-              switch(status.getCode()) {
+              switch (status.getCode()) {
 
               case ProtocolStatus.WOULDBLOCK:
                 // retry ?
                 fetchQueues.addFetchItem(fit);
                 break;
 
-              case ProtocolStatus.SUCCESS:        // got a page
-                pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
+              case ProtocolStatus.SUCCESS: // got a page
+                pstatus = output(fit.url, fit.datum, content, status,
+                    CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
                 updateStatus(content.getContent().length);
-                if (pstatus != null && pstatus.isSuccess() &&
-                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+                if (pstatus != null && pstatus.isSuccess()
+                    && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
                   String newUrl = pstatus.getMessage();
                   int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
-                  Text redirUrl =
-                    handleRedirect(fit.url, fit.datum,
-                                   urlString, newUrl,
-                                   refreshTime < Fetcher.PERM_REFRESH_TIME,
-                                   Fetcher.CONTENT_REDIR);
+                  Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+                      newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
+                      Fetcher.CONTENT_REDIR);
                   if (redirUrl != null) {
                     queueRedirect(redirUrl, fit);
                   }
                 }
                 break;
 
-              case ProtocolStatus.MOVED:         // redirect
+              case ProtocolStatus.MOVED: // redirect
               case ProtocolStatus.TEMP_MOVED:
                 int code;
                 boolean temp;
@@ -761,10 +804,8 @@ public class Fetcher extends Configured
                 }
                 output(fit.url, fit.datum, content, status, code);
                 String newUrl = status.getMessage();
-                Text redirUrl =
-                  handleRedirect(fit.url, fit.datum,
-                                 urlString, newUrl, temp,
-                                 Fetcher.PROTOCOL_REDIR);
+                Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+                    newUrl, temp, Fetcher.PROTOCOL_REDIR);
                 if (redirUrl != null) {
                   queueRedirect(redirUrl, fit);
                 } else {
@@ -775,31 +816,37 @@ public class Fetcher extends Configured
 
               case ProtocolStatus.EXCEPTION:
                 logError(fit.url, status.getMessage());
-                int killedURLs = fetchQueues.checkExceptionThreshold(fit.getQueueID());
-                if (killedURLs!=0)
-                   reporter.incrCounter("FetcherStatus", "AboveExceptionThresholdInQueue", killedURLs);
+                int killedURLs = fetchQueues.checkExceptionThreshold(fit
+                    .getQueueID());
+                if (killedURLs != 0)
+                  reporter.incrCounter("FetcherStatus",
+                      "AboveExceptionThresholdInQueue", killedURLs);
                 /* FALLTHROUGH */
-              case ProtocolStatus.RETRY:          // retry
+              case ProtocolStatus.RETRY: // retry
               case ProtocolStatus.BLOCKED:
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
+                output(fit.url, fit.datum, null, status,
+                    CrawlDatum.STATUS_FETCH_RETRY);
                 break;
 
-              case ProtocolStatus.GONE:           // gone
+              case ProtocolStatus.GONE: // gone
               case ProtocolStatus.NOTFOUND:
               case ProtocolStatus.ACCESS_DENIED:
               case ProtocolStatus.ROBOTS_DENIED:
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
+                output(fit.url, fit.datum, null, status,
+                    CrawlDatum.STATUS_FETCH_GONE);
                 break;
 
               case ProtocolStatus.NOTMODIFIED:
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+                output(fit.url, fit.datum, null, status,
+                    CrawlDatum.STATUS_FETCH_NOTMODIFIED);
                 break;
 
               default:
                 if (LOG.isWarnEnabled()) {
                   LOG.warn("Unknown ProtocolStatus: " + status.getCode());
                 }
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
+                output(fit.url, fit.datum, null, status,
+                    CrawlDatum.STATUS_FETCH_RETRY);
               }
 
               if (redirecting && redirectCount > maxRedirect) {
@@ -807,34 +854,38 @@ public class Fetcher extends Configured
                 if (LOG.isInfoEnabled()) {
                   LOG.info(" - redirect count exceeded " + fit.url);
                 }
-                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
+                output(fit.url, fit.datum, null,
+                    ProtocolStatus.STATUS_REDIR_EXCEEDED,
+                    CrawlDatum.STATUS_FETCH_GONE);
               }
 
             } while (redirecting && (redirectCount <= maxRedirect));
 
-          } catch (Throwable t) {                 // unexpected exception
+          } catch (Throwable t) { // unexpected exception
             // unblock
             fetchQueues.finishFetchItem(fit);
             logError(fit.url, StringUtils.stringifyException(t));
-            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
+            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
+                CrawlDatum.STATUS_FETCH_RETRY);
           }
         }
 
       } catch (Throwable e) {
         if (LOG.isErrorEnabled()) {
-          LOG.error("fetcher caught:"+e.toString());
+          LOG.error("fetcher caught:" + e.toString());
         }
       } finally {
-        if (fit != null) fetchQueues.finishFetchItem(fit);
+        if (fit != null)
+          fetchQueues.finishFetchItem(fit);
         activeThreads.decrementAndGet(); // count threads
-        LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
+        LOG.info("-finishing thread " + getName() + ", activeThreads="
+            + activeThreads);
       }
     }
 
-    private Text handleRedirect(Text url, CrawlDatum datum,
-                                String urlString, String newUrl,
-                                boolean temp, String redirType)
-    throws MalformedURLException, URLFilterException {
+    private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+        String newUrl, boolean temp, String redirType)
+        throws MalformedURLException, URLFilterException {
       newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
       newUrl = urlFilters.filter(newUrl);
 
@@ -844,13 +895,14 @@ public class Fetcher extends Configured
           String newHost = new URL(newUrl).getHost().toLowerCase();
           if (!origHost.equals(newHost)) {
             if (LOG.isDebugEnabled()) {
-              LOG.debug(" - ignoring redirect " + redirType + " from " +
-                          urlString + " to " + newUrl +
-                          " because external links are ignored");
+              LOG.debug(" - ignoring redirect " + redirType + " from "
+                  + urlString + " to " + newUrl
+                  + " because external links are ignored");
             }
             return null;
           }
-        } catch (MalformedURLException e) { }
+        } catch (MalformedURLException e) {
+        }
       }
 
       if (newUrl != null && !newUrl.equals(urlString)) {
@@ -860,13 +912,13 @@ public class Fetcher extends Configured
           redirecting = true;
           redirectCount++;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " +
-                url + " (fetching now)");
+            LOG.debug(" - " + redirType + " redirect to " + url
+                + " (fetching now)");
           }
           return url;
         } else {
           CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
-              datum.getFetchInterval(),datum.getScore());
+              datum.getFetchInterval(), datum.getScore());
           // transfer existing metadata
           newDatum.getMetaData().putAll(datum.getMetaData());
           try {
@@ -880,21 +932,22 @@ public class Fetcher extends Configured
           }
           output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
           if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " +
-                url + " (fetching later)");
+            LOG.debug(" - " + redirType + " redirect to " + url
+                + " (fetching later)");
           }
           return null;
         }
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(" - " + redirType + " redirect skipped: " +
-              (newUrl != null ? "to same url" : "filtered"));
+          LOG.debug(" - " + redirType + " redirect skipped: "
+              + (newUrl != null ? "to same url" : "filtered"));
         }
         return null;
       }
     }
 
-    private void queueRedirect(Text redirUrl, FetchItem fit) throws ScoringFilterException {
+    private void queueRedirect(Text redirUrl, FetchItem fit)
+        throws ScoringFilterException {
       CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
           fit.datum.getFetchInterval(), fit.datum.getScore());
       // transfer all existing metadata to the redirect
@@ -906,13 +959,13 @@ public class Fetcher extends Configured
       }
       fit = FetchItem.create(redirUrl, newDatum, queueMode);
       if (fit != null) {
-        FetchItemQueue fiq =
-          fetchQueues.getFetchItemQueue(fit.queueID);
+        FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
         fiq.addInProgressFetchItem(fit);
       } else {
         // stop redirecting
         redirecting = false;
-        reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1);
+        reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
+            1);
       }
     }
 
@@ -923,26 +976,29 @@ public class Fetcher extends Configured
       errors.incrementAndGet();
     }
 
-    private ParseStatus output(Text key, CrawlDatum datum,
-                        Content content, ProtocolStatus pstatus, int status) {
+    private ParseStatus output(Text key, CrawlDatum datum, Content content,
+        ProtocolStatus pstatus, int status) {
 
       return output(key, datum, content, pstatus, status, 0);
     }
 
-    private ParseStatus output(Text key, CrawlDatum datum,
-                        Content content, ProtocolStatus pstatus, int status, int outlinkDepth) {
+    private ParseStatus output(Text key, CrawlDatum datum, Content content,
+        ProtocolStatus pstatus, int status, int outlinkDepth) {
 
       datum.setStatus(status);
       datum.setFetchTime(System.currentTimeMillis());
-      if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
-      
+      if (pstatus != null)
+        datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
       ParseResult parseResult = null;
       if (content != null) {
         Metadata metadata = content.getMetadata();
-        
+
         // store the guessed content type in the crawldatum
-        if (content.getContentType() != null) datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(content.getContentType()));
-        
+        if (content.getContentType() != null)
+          datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+              new Text(content.getContentType()));
+
         // add segment to metadata
         metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
         // add score to content metadata so that ParseSegment can pick it up.
@@ -953,29 +1009,34 @@ public class Fetcher extends Configured
             LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
           }
         }
-        /* Note: Fetcher will only follow meta-redirects coming from the
-         * original URL. */
+        /*
+         * Note: Fetcher will only follow meta-redirects coming from the
+         * original URL.
+         */
         if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
-          if (!skipTruncated || (skipTruncated && !ParseSegment.isTruncated(content))) {
+          if (!skipTruncated
+              || (skipTruncated && !ParseSegment.isTruncated(content))) {
             try {
               parseResult = this.parseUtil.parse(content);
             } catch (Exception e) {
-              LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
+              LOG.warn("Error parsing: " + key + ": "
+                  + StringUtils.stringifyException(e));
             }
           }
-  
+
           if (parseResult == null) {
-            byte[] signature =
-              SignatureFactory.getSignature(getConf()).calculate(content,
-                  new ParseStatus().getEmptyParse(conf));
+            byte[] signature = SignatureFactory.getSignature(getConf())
+                .calculate(content, new ParseStatus().getEmptyParse(conf));
             datum.setSignature(signature);
           }
         }
 
-        /* Store status code in content So we can read this value during
-         * parsing (as a separate job) and decide to parse or not.
+        /*
+         * Store status code in content So we can read this value during parsing
+         * (as a separate job) and decide to parse or not.
          */
-        content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
+        content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+            Integer.toString(status));
       }
 
       try {
@@ -996,11 +1057,10 @@ public class Fetcher extends Configured
 
             // Calculate page signature. For non-parsing fetchers this will
             // be done in ParseSegment
-            byte[] signature =
-              SignatureFactory.getSignature(getConf()).calculate(content, parse);
+            byte[] signature = SignatureFactory.getSignature(getConf())
+                .calculate(content, parse);
             // Ensure segment name and score are in parseData metadata
-            parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
-                segmentName);
+            parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
             parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
                 StringUtil.toHexString(signature));
             // Pass fetch time to content meta
@@ -1039,7 +1099,8 @@ public class Fetcher extends Configured
             for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {
               String toUrl = links[i].getToUrl();
 
-              toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, fromHost, ignoreExternalLinks, urlFilters, normalizers);
+              toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
+                  fromHost, ignoreExternalLinks, urlFilters, normalizers);
               if (toUrl == null) {
                 continue;
               }
@@ -1052,49 +1113,57 @@ public class Fetcher extends Configured
 
             // Only process depth N outlinks
             if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
-              reporter.incrCounter("FetcherOutlinks", "outlinks_detected", outlinks.size());
+              reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
+                  outlinks.size());
 
               // Counter to limit num outlinks to follow per page
               int outlinkCounter = 0;
 
-              // Calculate variable number of outlinks by depth using the divisor (outlinks = Math.floor(divisor / depth * num.links))
-              int maxOutlinksByDepth = (int)Math.floor(outlinksDepthDivisor / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
+              // Calculate variable number of outlinks by depth using the
+              // divisor (outlinks = Math.floor(divisor / depth * num.links))
+              int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
+                  / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
 
               String followUrl;
 
               // Walk over the outlinks and add as new FetchItem to the queues
               Iterator<String> iter = outlinks.iterator();
-              while(iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
+              while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
                 followUrl = iter.next();
 
                 // Check whether we'll follow external outlinks
                 if (outlinksIgnoreExternal) {
-                  if (!URLUtil.getHost(url.toString()).equals(URLUtil.getHost(followUrl))) {
+                  if (!URLUtil.getHost(url.toString()).equals(
+                      URLUtil.getHost(followUrl))) {
                     continue;
                   }
                 }
 
-                reporter.incrCounter("FetcherOutlinks", "outlinks_following", 1);
+                reporter
+                    .incrCounter("FetcherOutlinks", "outlinks_following", 1);
 
                 // Create new FetchItem with depth incremented
-                FetchItem fit = FetchItem.create(new Text(followUrl), new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), queueMode, outlinkDepth + 1);
+                FetchItem fit = FetchItem.create(new Text(followUrl),
+                    new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
+                    queueMode, outlinkDepth + 1);
                 fetchQueues.addFetchItem(fit);
 
                 outlinkCounter++;
               }
             }
 
-            // Overwrite the outlinks in ParseData with the normalized and filtered set
-            parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList.size()]));
+            // Overwrite the outlinks in ParseData with the normalized and
+            // filtered set
+            parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
+                .size()]));
 
-            output.collect(url, new NutchWritable(
-                    new ParseImpl(new ParseText(parse.getText()),
-                                  parseData, parse.isCanonical())));
+            output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+                parse.getText()), parseData, parse.isCanonical())));
           }
         }
       } catch (IOException e) {
         if (LOG.isErrorEnabled()) {
-          LOG.error("fetcher caught:"+e.toString());
+          LOG.error("fetcher caught:" + e.toString());
         }
       }
 
@@ -1102,7 +1171,8 @@ public class Fetcher extends Configured
       if (parseResult != null && !parseResult.isEmpty()) {
         Parse p = parseResult.get(content.getUrl());
         if (p != null) {
-          reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p.getData().getStatus().getMajorCode()], 1);
+          reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
+              .getData().getStatus().getMajorCode()], 1);
           return p.getData().getStatus();
         }
       }
@@ -1116,33 +1186,39 @@ public class Fetcher extends Configured
     public synchronized boolean isHalted() {
       return halted;
     }
-    
+
   }
 
-  public Fetcher() { super(null); }
+  public Fetcher() {
+    super(null);
+  }
 
-  public Fetcher(Configuration conf) { super(conf); }
+  public Fetcher(Configuration conf) {
+    super(conf);
+  }
 
   private void updateStatus(int bytesInPage) throws IOException {
     pages.incrementAndGet();
     bytes.addAndGet(bytesInPage);
   }
 
-
-  private void reportStatus(int pagesLastSec, int bytesLastSec) throws IOException {
+  private void reportStatus(int pagesLastSec, int bytesLastSec)
+      throws IOException {
     StringBuilder status = new StringBuilder();
-    Long elapsed = new Long((System.currentTimeMillis() - start)/1000);
+    Long elapsed = new Long((System.currentTimeMillis() - start) / 1000);
 
-    float avgPagesSec =  (float) pages.get() / elapsed.floatValue();
-    long avgBytesSec =  (bytes.get() /125l) / elapsed.longValue();
+    float avgPagesSec = (float) pages.get() / elapsed.floatValue();
+    long avgBytesSec = (bytes.get() / 125l) / elapsed.longValue();
 
-    status.append(activeThreads).append(" threads (").append(spinWaiting.get()).append(" waiting), ");
+    status.append(activeThreads).append(" threads (").append(spinWaiting.get())
+        .append(" waiting), ");
     status.append(fetchQueues.getQueueCount()).append(" queues, ");
     status.append(fetchQueues.getTotalSize()).append(" URLs queued, ");
     status.append(pages).append(" pages, ").append(errors).append(" errors, ");
     status.append(String.format("%.2f", avgPagesSec)).append(" pages/s (");
     status.append(pagesLastSec).append(" last sec), ");
-    status.append(avgBytesSec).append(" kbits/s (").append((bytesLastSec / 125)).append(" last sec)");
+    status.append(avgBytesSec).append(" kbits/s (")
+        .append((bytesLastSec / 125)).append(" last sec)");
 
     reporter.setStatus(status.toString());
   }
@@ -1154,12 +1230,13 @@ public class Fetcher extends Configured
     this.storingContent = isStoringContent(job);
     this.parsing = isParsing(job);
 
-//    if (job.getBoolean("fetcher.verbose", false)) {
-//      LOG.setLevel(Level.FINE);
-//    }
+    // if (job.getBoolean("fetcher.verbose", false)) {
+    // LOG.setLevel(Level.FINE);
+    // }
   }
 
-  public void close() {}
+  public void close() {
+  }
 
   public static boolean isParsing(Configuration conf) {
     return conf.getBoolean("fetcher.parse", true);
@@ -1170,43 +1247,53 @@ public class Fetcher extends Configured
   }
 
   public void run(RecordReader<Text, CrawlDatum> input,
-      OutputCollector<Text, NutchWritable> output,
-                  Reporter reporter) throws IOException {
+      OutputCollector<Text, NutchWritable> output, Reporter reporter)
+      throws IOException {
 
     this.output = output;
     this.reporter = reporter;
     this.fetchQueues = new FetchItemQueues(getConf());
 
     int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
-    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: threads: " + threadCount);
+    }
 
     int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2);
-    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: time-out divisor: " + timeoutDivisor); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: time-out divisor: " + timeoutDivisor);
+    }
 
-    int queueDepthMuliplier =  getConf().getInt("fetcher.queue.depth.multiplier", 50);
+    int queueDepthMuliplier = getConf().getInt(
+        "fetcher.queue.depth.multiplier", 50);
 
-    feeder = new QueueFeeder(input, fetchQueues, threadCount * queueDepthMuliplier);
-    //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+    feeder = new QueueFeeder(input, fetchQueues, threadCount
+        * queueDepthMuliplier);
+    // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
 
-    // the value of the time limit is either -1 or the time where it should finish
+    // the value of the time limit is either -1 or the time where it should
+    // finish
     long timelimit = getConf().getLong("fetcher.timelimit", -1);
-    if (timelimit != -1) feeder.setTimeLimit(timelimit);
+    if (timelimit != -1)
+      feeder.setTimeLimit(timelimit);
     feeder.start();
 
     // set non-blocking & no-robots mode for HTTP protocol plugins.
     getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
     getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
 
-    for (int i = 0; i < threadCount; i++) {       // spawn threads
+    for (int i = 0; i < threadCount; i++) { // spawn threads
       FetcherThread t = new FetcherThread(getConf());
       fetcherThreads.add(t);
       t.start();
     }
 
     // select a timeout that avoids a task timeout
-    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/timeoutDivisor;
+    long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000)
+        / timeoutDivisor;
 
-    // Used for threshold check, holds pages and bytes processed in the last second
+    // Used for threshold check, holds pages and bytes processed in the last
+    // second
     int pagesLastSec;
     int bytesLastSec;
 
@@ -1214,57 +1301,74 @@ public class Fetcher extends Configured
     boolean throughputThresholdExceeded = false;
     int throughputThresholdNumRetries = 0;
 
-    int throughputThresholdPages = getConf().getInt("fetcher.throughput.threshold.pages", -1);
-    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages); }
-    int throughputThresholdMaxRetries = getConf().getInt("fetcher.throughput.threshold.retries", 5);
-    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: throughput threshold retries: " + throughputThresholdMaxRetries); }
-    long throughputThresholdTimeLimit = getConf().getLong("fetcher.throughput.threshold.check.after", -1);
-    
+    int throughputThresholdPages = getConf().getInt(
+        "fetcher.throughput.threshold.pages", -1);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: throughput threshold: " + throughputThresholdPages);
+    }
+    int throughputThresholdMaxRetries = getConf().getInt(
+        "fetcher.throughput.threshold.retries", 5);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: throughput threshold retries: "
+          + throughputThresholdMaxRetries);
+    }
+    long throughputThresholdTimeLimit = getConf().getLong(
+        "fetcher.throughput.threshold.check.after", -1);
+
     int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000;
     int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount);
-    if (maxNumThreads < threadCount){
-      LOG.info("fetcher.maxNum.threads can't be < than "+ threadCount + " : using "+threadCount+" instead");
+    if (maxNumThreads < threadCount) {
+      LOG.info("fetcher.maxNum.threads can't be < than " + threadCount
+          + " : using " + threadCount + " instead");
       maxNumThreads = threadCount;
     }
-    int bandwidthTargetCheckEveryNSecs  = getConf().getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
-    if (bandwidthTargetCheckEveryNSecs < 1){
+    int bandwidthTargetCheckEveryNSecs = getConf().getInt(
+        "fetcher.bandwidth.target.check.everyNSecs", 30);
+    if (bandwidthTargetCheckEveryNSecs < 1) {
       LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
       bandwidthTargetCheckEveryNSecs = 1;
     }
-    
+
     int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1);
-    
+
     int bandwidthTargetCheckCounter = 0;
     long bytesAtLastBWTCheck = 0l;
-    
-    do {                                          // wait for threads to exit
+
+    do { // wait for threads to exit
       pagesLastSec = pages.get();
-      bytesLastSec = (int)bytes.get();
+      bytesLastSec = (int) bytes.get();
 
       try {
         Thread.sleep(1000);
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException e) {
+      }
 
       pagesLastSec = pages.get() - pagesLastSec;
-      bytesLastSec = (int)bytes.get() - bytesLastSec;
+      bytesLastSec = (int) bytes.get() - bytesLastSec;
 
       reporter.incrCounter("FetcherStatus", "bytes_downloaded", bytesLastSec);
 
       reportStatus(pagesLastSec, bytesLastSec);
 
-      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
-          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize()+ ", fetchQueues.getQueueCount="+fetchQueues.getQueueCount());
+      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
+          + spinWaiting.get() + ", fetchQueues.totalSize="
+          + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
+          + fetchQueues.getQueueCount());
 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
       }
 
       // if throughput threshold is enabled
-      if (throughputThresholdTimeLimit < System.currentTimeMillis() && throughputThresholdPages != -1) {
+      if (throughputThresholdTimeLimit < System.currentTimeMillis()
+          && throughputThresholdPages != -1) {
         // Check if we're dropping below the threshold
         if (pagesLastSec < throughputThresholdPages) {
           throughputThresholdNumRetries++;
-          LOG.warn(Integer.toString(throughputThresholdNumRetries) + ": dropping below configured threshold of " + Integer.toString(throughputThresholdPages) + " pages per second");
+          LOG.warn(Integer.toString(throughputThresholdNumRetries)
+              + ": dropping below configured threshold of "
+              + Integer.toString(throughputThresholdPages)
+              + " pages per second");
 
           // Quit if we dropped below threshold too many times
           if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
@@ -1273,42 +1377,55 @@ public class Fetcher extends Configured
             // Disable the threshold checker
             throughputThresholdPages = -1;
 
-            // Empty the queues cleanly and get number of items that were dropped
+            // Empty the queues cleanly and get number of items that were
+            // dropped
             int hitByThrougputThreshold = fetchQueues.emptyQueues();
 
-            if (hitByThrougputThreshold != 0) reporter.incrCounter("FetcherStatus",
-              "hitByThrougputThreshold", hitByThrougputThreshold);
+            if (hitByThrougputThreshold != 0)
+              reporter.incrCounter("FetcherStatus", "hitByThrougputThreshold",
+                  hitByThrougputThreshold);
           }
         }
       }
-      
+
       // adjust the number of threads if a target bandwidth has been set
-      if (targetBandwidth>0) {
-        if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs) bandwidthTargetCheckCounter++;
-        else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs){  	
-          long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)/bandwidthTargetCheckEveryNSecs;
+      if (targetBandwidth > 0) {
+        if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
+          bandwidthTargetCheckCounter++;
+        else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
+          long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
+              / bandwidthTargetCheckEveryNSecs;
 
           bytesAtLastBWTCheck = bytes.get();
           bandwidthTargetCheckCounter = 0;
 
           int averageBdwPerThread = 0;
-          if (activeThreads.get()>0)
-            averageBdwPerThread = Math.round(bpsSinceLastCheck/activeThreads.get());   
+          if (activeThreads.get() > 0)
+            averageBdwPerThread = Math.round(bpsSinceLastCheck
+                / activeThreads.get());
 
-          LOG.info("averageBdwPerThread : "+(averageBdwPerThread/1000) + " kbps");
+          LOG.info("averageBdwPerThread : " + (averageBdwPerThread / 1000)
+              + " kbps");
 
-          if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0){
+          if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) {
             // check whether it is worth doing e.g. more queues than threads
 
-            if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads.get()){
-             
+            if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads
+                .get()) {
+
               long remainingBdw = targetBandwidth - bpsSinceLastCheck;
-              int additionalThreads = Math.round(remainingBdw/averageBdwPerThread);
+              int additionalThreads = Math.round(remainingBdw
+                  / averageBdwPerThread);
               int availableThreads = maxNumThreads - activeThreads.get();
 
-              // determine the number of available threads (min between availableThreads and additionalThreads)
-              additionalThreads = (availableThreads < additionalThreads ? availableThreads:additionalThreads);
-              LOG.info("Has space for more threads ("+(bpsSinceLastCheck/1000) +" vs "+(targetBandwidth/1000)+" kbps) \t=> adding "+additionalThreads+" new threads");
+              // determine the number of available threads (min between
+              // availableThreads and additionalThreads)
+              additionalThreads = (availableThreads < additionalThreads ? availableThreads
+                  : additionalThreads);
+              LOG.info("Has space for more threads ("
+                  + (bpsSinceLastCheck / 1000) + " vs "
+                  + (targetBandwidth / 1000) + " kbps) \t=> adding "
+                  + additionalThreads + " new threads");
               // activate new threads
               for (int i = 0; i < additionalThreads; i++) {
                 FetcherThread thread = new FetcherThread(getConf());
@@ -1316,14 +1433,18 @@ public class Fetcher extends Configured
                 thread.start();
               }
             }
-          }
-          else if (bpsSinceLastCheck > targetBandwidth && averageBdwPerThread > 0){
-            // if the bandwidth we're using is greater then the expected bandwidth, we have to stop some threads
+          } else if (bpsSinceLastCheck > targetBandwidth
+              && averageBdwPerThread > 0) {
+            // if the bandwidth we're using is greater then the expected
+            // bandwidth, we have to stop some threads
             long excessBdw = bpsSinceLastCheck - targetBandwidth;
-            int excessThreads = Math.round(excessBdw/averageBdwPerThread);
-            LOG.info("Exceeding target bandwidth ("+bpsSinceLastCheck/1000 +" vs "+(targetBandwidth/1000)+" kbps). \t=> excessThreads = "+excessThreads);
+            int excessThreads = Math.round(excessBdw / averageBdwPerThread);
+            LOG.info("Exceeding target bandwidth (" + bpsSinceLastCheck / 1000
+                + " vs " + (targetBandwidth / 1000)
+                + " kbps). \t=> excessThreads = " + excessThreads);
             // keep at least one
-            if (excessThreads >= fetcherThreads.size()) excessThreads = 0;
+            if (excessThreads >= fetcherThreads.size())
+              excessThreads = 0;
             // de-activates threads
             for (int i = 0; i < excessThreads; i++) {
               FetcherThread thread = fetcherThreads.removeLast();
@@ -1336,18 +1457,20 @@ public class Fetcher extends Configured
       // check timelimit
       if (!feeder.isAlive()) {
         int hitByTimeLimit = fetchQueues.checkTimelimit();
-        if (hitByTimeLimit != 0) reporter.incrCounter("FetcherStatus",
-            "hitByTimeLimit", hitByTimeLimit);
+        if (hitByTimeLimit != 0)
+          reporter.incrCounter("FetcherStatus", "hitByTimeLimit",
+              hitByTimeLimit);
       }
 
       // some requests seem to hang, despite all intentions
       if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
         if (LOG.isWarnEnabled()) {
-          LOG.warn("Aborting with "+activeThreads+" hung threads.");
+          LOG.warn("Aborting with " + activeThreads + " hung threads.");
           for (int i = 0; i < fetcherThreads.size(); i++) {
             FetcherThread thread = fetcherThreads.get(i);
             if (thread.isAlive()) {
-              LOG.warn("Thread #" + i + " hung while processing " + thread.reprUrl);
+              LOG.warn("Thread #" + i + " hung while processing "
+                  + thread.reprUrl);
               if (LOG.isDebugEnabled()) {
                 StackTraceElement[] stack = thread.getStackTrace();
                 StringBuilder sb = new StringBuilder();
@@ -1368,8 +1491,7 @@ public class Fetcher extends Configured
 
   }
 
-  public void fetch(Path segment, int threads)
-    throws IOException {
+  public void fetch(Path segment, int threads) throws IOException {
 
     checkConfiguration();
 
@@ -1390,24 +1512,31 @@ public class Fetcher extends Configured
       getConf().setLong("fetcher.timelimit", timelimit);
     }
 
-    // Set the time limit after which the throughput threshold feature is enabled
-    timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 10);
+    // Set the time limit after which the throughput threshold feature is
+    // enabled
+    timelimit = getConf().getLong("fetcher.throughput.threshold.check.after",
+        10);
     timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
     getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);
 
     int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
     if (maxOutlinkDepth > 0) {
-      LOG.info("Fetcher: following outlinks up to depth: " + Integer.toString(maxOutlinkDepth));
+      LOG.info("Fetcher: following outlinks up to depth: "
+          + Integer.toString(maxOutlinkDepth));
 
-      int maxOutlinkDepthNumLinks = getConf().getInt("fetcher.follow.outlinks.num.links", 4);
-      int outlinksDepthDivisor = getConf().getInt("fetcher.follow.outlinks.depth.divisor", 2);
+      int maxOutlinkDepthNumLinks = getConf().getInt(
+          "fetcher.follow.outlinks.num.links", 4);
+      int outlinksDepthDivisor = getConf().getInt(
+          "fetcher.follow.outlinks.depth.divisor", 2);
 
       int totalOutlinksToFollow = 0;
       for (int i = 0; i < maxOutlinkDepth; i++) {
-        totalOutlinksToFollow += (int)Math.floor(outlinksDepthDivisor / (i + 1) * maxOutlinkDepthNumLinks);
+        totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor
+            / (i + 1) * maxOutlinkDepthNumLinks);
       }
 
-      LOG.info("Fetcher: maximum outlinks to follow: " + Integer.toString(totalOutlinksToFollow));
+      LOG.info("Fetcher: maximum outlinks to follow: "
+          + Integer.toString(totalOutlinksToFollow));
     }
 
     JobConf job = new NutchJob(getConf());
@@ -1419,7 +1548,8 @@ public class Fetcher extends Configured
     // for politeness, don't permit parallel execution of a single task
     job.setSpeculativeExecution(false);
 
-    FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+    FileInputFormat.addInputPath(job, new Path(segment,
+        CrawlDatum.GENERATE_DIR_NAME));
     job.setInputFormat(InputFormat.class);
 
     job.setMapRunnerClass(Fetcher.class);
@@ -1432,10 +1562,10 @@ public class Fetcher extends Configured
     JobClient.runJob(job);
 
     long end = System.currentTimeMillis();
-    LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+    LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
   }
 
-
   /** Run the fetcher. */
   public static void main(String[] args) throws Exception {
     int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
@@ -1456,9 +1586,9 @@ public class Fetcher extends Configured
     int threads = getConf().getInt("fetcher.threads.fetch", 10);
     boolean parsing = false;
 
-    for (int i = 1; i < args.length; i++) {       // parse command line
-      if (args[i].equals("-threads")) {           // found -threads option
-        threads =  Integer.parseInt(args[++i]);
+    for (int i = 1; i < args.length; i++) { // parse command line
+      if (args[i].equals("-threads")) { // found -threads option
+        threads = Integer.parseInt(args[++i]);
       }
     }
 

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/FetcherOutputFormat.java Thu Jan 29 05:38:59 2015
@@ -48,74 +48,68 @@ public class FetcherOutputFormat impleme
   public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
     Path out = FileOutputFormat.getOutputPath(job);
     if ((out == null) && (job.getNumReduceTasks() != 0)) {
-    	throw new InvalidJobConfException(
-    			"Output directory not set in JobConf.");
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
     }
     if (fs == null) {
-    	fs = out.getFileSystem(job);
+      fs = out.getFileSystem(job);
     }
     if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
-    	throw new IOException("Segment already fetched!");
+      throw new IOException("Segment already fetched!");
   }
 
   public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
-                                      final JobConf job,
-                                      final String name,
-                                      final Progressable progress) throws IOException {
+      final JobConf job, final String name, final Progressable progress)
+      throws IOException {
 
     Path out = FileOutputFormat.getOutputPath(job);
-    final Path fetch =
-      new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
-    final Path content =
-      new Path(new Path(out, Content.DIR_NAME), name);
-    
-    final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
-
-    final MapFile.Writer fetchOut =
-      new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
-          compType, progress);
-    
+    final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
+    final Path content = new Path(new Path(out, Content.DIR_NAME), name);
+
+    final CompressionType compType = SequenceFileOutputFormat
+        .getOutputCompressionType(job);
+
+    final MapFile.Writer fetchOut = new MapFile.Writer(job, fs,
+        fetch.toString(), Text.class, CrawlDatum.class, compType, progress);
+
     return new RecordWriter<Text, NutchWritable>() {
-        private MapFile.Writer contentOut;
-        private RecordWriter<Text, Parse> parseOut;
+      private MapFile.Writer contentOut;
+      private RecordWriter<Text, Parse> parseOut;
 
-        {
-          if (Fetcher.isStoringContent(job)) {
-            contentOut = new MapFile.Writer(job, fs, content.toString(),
-                                            Text.class, Content.class,
-                                            compType, progress);
-          }
-
-          if (Fetcher.isParsing(job)) {
-            parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
-          }
+      {
+        if (Fetcher.isStoringContent(job)) {
+          contentOut = new MapFile.Writer(job, fs, content.toString(),
+              Text.class, Content.class, compType, progress);
         }
 
-        public void write(Text key, NutchWritable value)
-          throws IOException {
-
-          Writable w = value.get();
-          
-          if (w instanceof CrawlDatum)
-            fetchOut.append(key, w);
-          else if (w instanceof Content && contentOut != null)
-            contentOut.append(key, w);
-          else if (w instanceof Parse && parseOut != null)
-            parseOut.write(key, (Parse)w);
+        if (Fetcher.isParsing(job)) {
+          parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name,
+              progress);
         }
+      }
+
+      public void write(Text key, NutchWritable value) throws IOException {
 
-        public void close(Reporter reporter) throws IOException {
-          fetchOut.close();
-          if (contentOut != null) {
-            contentOut.close();
-          }
-          if (parseOut != null) {
-            parseOut.close(reporter);
-          }
+        Writable w = value.get();
+
+        if (w instanceof CrawlDatum)
+          fetchOut.append(key, w);
+        else if (w instanceof Content && contentOut != null)
+          contentOut.append(key, w);
+        else if (w instanceof Parse && parseOut != null)
+          parseOut.write(key, (Parse) w);
+      }
+
+      public void close(Reporter reporter) throws IOException {
+        fetchOut.close();
+        if (contentOut != null) {
+          contentOut.close();
+        }
+        if (parseOut != null) {
+          parseOut.close(reporter);
         }
+      }
 
-      };
+    };
 
-  }      
+  }
 }
-