You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:49:02 UTC

[46/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java
new file mode 100644
index 0000000..aad9ee9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.util.*;
+
+/**
+ * A queue-based fetcher.
+ * 
+ * <p>
+ * This fetcher uses a well-known model of one producer (a QueueFeeder) and many
+ * consumers (FetcherThread-s).
+ * 
+ * <p>
+ * QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
+ * which hold FetchItem-s that describe the items to be fetched. There are as
+ * many queues as there are unique hosts, but at any given time the total number
+ * of fetch items in all queues is less than a fixed number (currently set to a
+ * multiple of the number of threads).
+ * 
+ * <p>
+ * As items are consumed from the queues, the QueueFeeder continues to add new
+ * input items, so that their total count stays fixed (FetcherThread-s may also
+ * add new items to the queues e.g. as a results of redirection) - until all
+ * input items are exhausted, at which point the number of items in the queues
+ * begins to decrease. When this number reaches 0 fetcher will finish.
+ * 
+ * <p>
+ * This fetcher implementation handles per-host blocking itself, instead of
+ * delegating this work to protocol-specific plugins. Each per-host queue
+ * handles its own "politeness" settings, such as the maximum number of
+ * concurrent requests and crawl delay between consecutive requests - and also a
+ * list of requests in progress, and the time the last request was finished. As
+ * FetcherThread-s ask for new items to be fetched, queues may return eligible
+ * items or null if for "politeness" reasons this host's queue is not yet ready.
+ * 
+ * <p>
+ * If there are still unfetched items in the queues, but none of the items are
+ * ready, FetcherThread-s will spin-wait until either some items become
+ * available, or a timeout is reached (at which point the Fetcher will abort,
+ * assuming the task is hung).
+ * 
+ * @author Andrzej Bialecki
+ */
+public class Fetcher extends NutchTool implements Tool,
+MapRunnable<Text, CrawlDatum, Text, NutchWritable> {
+
+  public static final int PERM_REFRESH_TIME = 5;
+
+  public static final String CONTENT_REDIR = "content";
+
+  public static final String PROTOCOL_REDIR = "protocol";
+
+  public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
+
+  public static class InputFormat extends
+  SequenceFileInputFormat<Text, CrawlDatum> {
+    /** Don't split inputs, to keep things polite. */
+    public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
+      FileStatus[] files = listStatus(job);
+      FileSplit[] splits = new FileSplit[files.length];
+      for (int i = 0; i < files.length; i++) {
+        FileStatus cur = files[i];
+        splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+            (String[]) null);
+      }
+      return splits;
+    }
+  }
+
+  @SuppressWarnings("unused")
+  private OutputCollector<Text, NutchWritable> output;
+  private Reporter reporter;
+
+  private String segmentName;
+  private AtomicInteger activeThreads = new AtomicInteger(0);
+  private AtomicInteger spinWaiting = new AtomicInteger(0);
+
+  private long start = System.currentTimeMillis(); // start time of fetcher run
+  private AtomicLong lastRequestStart = new AtomicLong(start);
+
+  private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
+  private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
+  private AtomicInteger errors = new AtomicInteger(0); // total pages errored
+
+  private boolean storingContent;
+  private boolean parsing;
+  FetchItemQueues fetchQueues;
+  QueueFeeder feeder;
+
+  LinkedList<FetcherThread> fetcherThreads = new LinkedList<FetcherThread>();
+
+  public Fetcher() {
+    super(null);
+  }
+
+  public Fetcher(Configuration conf) {
+    super(conf);
+  }
+
+  private void reportStatus(int pagesLastSec, int bytesLastSec)
+      throws IOException {
+    StringBuilder status = new StringBuilder();
+    Long elapsed = new Long((System.currentTimeMillis() - start) / 1000);
+
+    float avgPagesSec = (float) pages.get() / elapsed.floatValue();
+    long avgBytesSec = (bytes.get() / 128l) / elapsed.longValue();
+
+    status.append(activeThreads).append(" threads (").append(spinWaiting.get())
+    .append(" waiting), ");
+    status.append(fetchQueues.getQueueCount()).append(" queues, ");
+    status.append(fetchQueues.getTotalSize()).append(" URLs queued, ");
+    status.append(pages).append(" pages, ").append(errors).append(" errors, ");
+    status.append(String.format("%.2f", avgPagesSec)).append(" pages/s (");
+    status.append(pagesLastSec).append(" last sec), ");
+    status.append(avgBytesSec).append(" kbits/s (")
+    .append((bytesLastSec / 128)).append(" last sec)");
+
+    reporter.setStatus(status.toString());
+  }
+
+  public void configure(JobConf job) {
+    setConf(job);
+
+    this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
+    this.storingContent = isStoringContent(job);
+    this.parsing = isParsing(job);
+
+    // if (job.getBoolean("fetcher.verbose", false)) {
+    // LOG.setLevel(Level.FINE);
+    // }
+  }
+
+  public void close() {
+  }
+
+  public static boolean isParsing(Configuration conf) {
+    return conf.getBoolean("fetcher.parse", true);
+  }
+
+  public static boolean isStoringContent(Configuration conf) {
+    return conf.getBoolean("fetcher.store.content", true);
+  }
+
+  public void run(RecordReader<Text, CrawlDatum> input,
+      OutputCollector<Text, NutchWritable> output, Reporter reporter)
+          throws IOException {
+
+    this.output = output;
+    this.reporter = reporter;
+    this.fetchQueues = new FetchItemQueues(getConf());
+
+    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: threads: {}", threadCount);
+    }
+
+    int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
+    }
+
+    int queueDepthMuliplier = getConf().getInt(
+        "fetcher.queue.depth.multiplier", 50);
+
+    feeder = new QueueFeeder(input, fetchQueues, threadCount
+        * queueDepthMuliplier);
+    // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+
+    // the value of the time limit is either -1 or the time where it should
+    // finish
+    long timelimit = getConf().getLong("fetcher.timelimit", -1);
+    if (timelimit != -1)
+      feeder.setTimeLimit(timelimit);
+    feeder.start();
+
+    // set non-blocking & no-robots mode for HTTP protocol plugins.
+    getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
+    getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
+
+    for (int i = 0; i < threadCount; i++) { // spawn threads
+      FetcherThread t = new FetcherThread(getConf(), getActiveThreads(), fetchQueues, 
+          feeder, spinWaiting, lastRequestStart, reporter, errors, segmentName,
+          parsing, output, storingContent, pages, bytes);
+      fetcherThreads.add(t);
+      t.start();
+    }
+
+    // select a timeout that avoids a task timeout
+    long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000)
+        / timeoutDivisor;
+
+    // Used for threshold check, holds pages and bytes processed in the last
+    // second
+    int pagesLastSec;
+    int bytesLastSec;
+
+    int throughputThresholdNumRetries = 0;
+
+    int throughputThresholdPages = getConf().getInt(
+        "fetcher.throughput.threshold.pages", -1);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
+    }
+    int throughputThresholdMaxRetries = getConf().getInt(
+        "fetcher.throughput.threshold.retries", 5);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: throughput threshold retries: {}",
+          throughputThresholdMaxRetries);
+    }
+    long throughputThresholdTimeLimit = getConf().getLong(
+        "fetcher.throughput.threshold.check.after", -1);
+
+    int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000;
+    int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount);
+    if (maxNumThreads < threadCount) {
+      LOG.info("fetcher.maxNum.threads can't be < than {} : using {} instead",
+          threadCount, threadCount);
+      maxNumThreads = threadCount;
+    }
+    int bandwidthTargetCheckEveryNSecs = getConf().getInt(
+        "fetcher.bandwidth.target.check.everyNSecs", 30);
+    if (bandwidthTargetCheckEveryNSecs < 1) {
+      LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
+      bandwidthTargetCheckEveryNSecs = 1;
+    }
+
+    int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1);
+
+    int bandwidthTargetCheckCounter = 0;
+    long bytesAtLastBWTCheck = 0l;
+
+    do { // wait for threads to exit
+      pagesLastSec = pages.get();
+      bytesLastSec = (int) bytes.get();
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+
+      pagesLastSec = pages.get() - pagesLastSec;
+      bytesLastSec = (int) bytes.get() - bytesLastSec;
+
+      reporter.incrCounter("FetcherStatus", "bytes_downloaded", bytesLastSec);
+
+      reportStatus(pagesLastSec, bytesLastSec);
+
+      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
+          + spinWaiting.get() + ", fetchQueues.totalSize="
+          + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
+          + fetchQueues.getQueueCount());
+
+      if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+        fetchQueues.dump();
+      }
+
+      // if throughput threshold is enabled
+      if (throughputThresholdTimeLimit < System.currentTimeMillis()
+          && throughputThresholdPages != -1) {
+        // Check if we're dropping below the threshold
+        if (pagesLastSec < throughputThresholdPages) {
+          throughputThresholdNumRetries++;
+          LOG.warn("{}: dropping below configured threshold of {} pages per second",
+              Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages));
+
+          // Quit if we dropped below threshold too many times
+          if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
+            LOG.warn("Dropped below threshold too many times, killing!");
+
+            // Disable the threshold checker
+            throughputThresholdPages = -1;
+
+            // Empty the queues cleanly and get number of items that were
+            // dropped
+            int hitByThrougputThreshold = fetchQueues.emptyQueues();
+
+            if (hitByThrougputThreshold != 0)
+              reporter.incrCounter("FetcherStatus", "hitByThrougputThreshold",
+                  hitByThrougputThreshold);
+          }
+        }
+      }
+
+      // adjust the number of threads if a target bandwidth has been set
+      if (targetBandwidth > 0) {
+        if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
+          bandwidthTargetCheckCounter++;
+        else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
+          long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
+              / bandwidthTargetCheckEveryNSecs;
+
+          bytesAtLastBWTCheck = bytes.get();
+          bandwidthTargetCheckCounter = 0;
+
+          int averageBdwPerThread = 0;
+          if (activeThreads.get() > 0)
+            averageBdwPerThread = Math.round(bpsSinceLastCheck
+                / activeThreads.get());
+
+          LOG.info("averageBdwPerThread : {} kbps", (averageBdwPerThread / 1000));
+
+          if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) {
+            // check whether it is worth doing e.g. more queues than threads
+
+            if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads
+                .get()) {
+
+              long remainingBdw = targetBandwidth - bpsSinceLastCheck;
+              int additionalThreads = Math.round(remainingBdw
+                  / averageBdwPerThread);
+              int availableThreads = maxNumThreads - activeThreads.get();
+
+              // determine the number of available threads (min between
+              // availableThreads and additionalThreads)
+              additionalThreads = (availableThreads < additionalThreads ? availableThreads
+                  : additionalThreads);
+              LOG.info("Has space for more threads ({} vs {} kbps) \t=> adding {} new threads",
+                  (bpsSinceLastCheck / 1000), (targetBandwidth / 1000), additionalThreads);
+              // activate new threads
+              for (int i = 0; i < additionalThreads; i++) {
+                FetcherThread thread = new FetcherThread(getConf(), getActiveThreads(), fetchQueues, 
+                    feeder, spinWaiting, lastRequestStart, reporter, errors, segmentName, parsing,
+                    output, storingContent, pages, bytes);
+                fetcherThreads.add(thread);
+                thread.start();
+              }
+            }
+          } else if (bpsSinceLastCheck > targetBandwidth
+              && averageBdwPerThread > 0) {
+            // if the bandwidth we're using is greater then the expected
+            // bandwidth, we have to stop some threads
+            long excessBdw = bpsSinceLastCheck - targetBandwidth;
+            int excessThreads = Math.round(excessBdw / averageBdwPerThread);
+            LOG.info("Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}",
+                bpsSinceLastCheck / 1000, (targetBandwidth / 1000), excessThreads);
+            // keep at least one
+            if (excessThreads >= fetcherThreads.size())
+              excessThreads = 0;
+            // de-activates threads
+            for (int i = 0; i < excessThreads; i++) {
+              FetcherThread thread = fetcherThreads.removeLast();
+              thread.setHalted(true);
+            }
+          }
+        }
+      }
+
+      // check timelimit
+      if (!feeder.isAlive()) {
+        int hitByTimeLimit = fetchQueues.checkTimelimit();
+        if (hitByTimeLimit != 0)
+          reporter.incrCounter("FetcherStatus", "hitByTimeLimit",
+              hitByTimeLimit);
+      }
+
+      // some requests seem to hang, despite all intentions
+      if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Aborting with {} hung threads.", activeThreads);
+          for (int i = 0; i < fetcherThreads.size(); i++) {
+            FetcherThread thread = fetcherThreads.get(i);
+            if (thread.isAlive()) {
+              LOG.warn("Thread #{} hung while processing {}", i, thread.getReprUrl());
+              if (LOG.isDebugEnabled()) {
+                StackTraceElement[] stack = thread.getStackTrace();
+                StringBuilder sb = new StringBuilder();
+                sb.append("Stack of thread #").append(i).append(":\n");
+                for (StackTraceElement s : stack) {
+                  sb.append(s.toString()).append('\n');
+                }
+                LOG.debug(sb.toString());
+              }
+            }
+          }
+        }
+        return;
+      }
+
+    } while (activeThreads.get() > 0);
+    LOG.info("-activeThreads={}", activeThreads);
+
+  }
+
+  public void fetch(Path segment, int threads) throws IOException {
+
+    checkConfiguration();
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Fetcher: starting at {}", sdf.format(start));
+      LOG.info("Fetcher: segment: {}", segment);
+    }
+
+    // set the actual time for the timelimit relative
+    // to the beginning of the whole job and not of a specific task
+    // otherwise it keeps trying again if a task fails
+    long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+    if (timelimit != -1) {
+      timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+      LOG.info("Fetcher Timelimit set for : {}", timelimit);
+      getConf().setLong("fetcher.timelimit", timelimit);
+    }
+
+    // Set the time limit after which the throughput threshold feature is
+    // enabled
+    timelimit = getConf().getLong("fetcher.throughput.threshold.check.after",
+        10);
+    timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+    getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);
+
+    int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
+    if (maxOutlinkDepth > 0) {
+      LOG.info("Fetcher: following outlinks up to depth: {}",
+          Integer.toString(maxOutlinkDepth));
+
+      int maxOutlinkDepthNumLinks = getConf().getInt(
+          "fetcher.follow.outlinks.num.links", 4);
+      int outlinksDepthDivisor = getConf().getInt(
+          "fetcher.follow.outlinks.depth.divisor", 2);
+
+      int totalOutlinksToFollow = 0;
+      for (int i = 0; i < maxOutlinkDepth; i++) {
+        totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor
+            / (i + 1) * maxOutlinkDepthNumLinks);
+      }
+
+      LOG.info("Fetcher: maximum outlinks to follow: {}",
+          Integer.toString(totalOutlinksToFollow));
+    }
+
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("fetch " + segment);
+
+    job.setInt("fetcher.threads.fetch", threads);
+    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
+
+    // for politeness, don't permit parallel execution of a single task
+    job.setSpeculativeExecution(false);
+
+    FileInputFormat.addInputPath(job, new Path(segment,
+        CrawlDatum.GENERATE_DIR_NAME));
+    job.setInputFormat(InputFormat.class);
+
+    job.setMapRunnerClass(Fetcher.class);
+
+    FileOutputFormat.setOutputPath(job, segment);
+    job.setOutputFormat(FetcherOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NutchWritable.class);
+
+    JobClient.runJob(job);
+
+    long end = System.currentTimeMillis();
+    LOG.info("Fetcher: finished at {}, elapsed: {}", sdf.format(end),
+        TimingUtil.elapsedTime(start, end));
+  }
+
+  /** Run the fetcher. */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+
+    String usage = "Usage: Fetcher <segment> [-threads n]";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      return -1;
+    }
+
+    Path segment = new Path(args[0]);
+
+    int threads = getConf().getInt("fetcher.threads.fetch", 10);
+
+    for (int i = 1; i < args.length; i++) { // parse command line
+      if (args[i].equals("-threads")) { // found -threads option
+        threads = Integer.parseInt(args[++i]);
+      }
+    }
+
+    getConf().setInt("fetcher.threads.fetch", threads);
+
+    try {
+      fetch(segment, threads);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
+      return -1;
+    }
+
+  }
+
+  private void checkConfiguration() {
+    // ensure that a value has been set for the agent name
+    String agentName = getConf().get("http.agent.name");
+    if (agentName == null || agentName.trim().length() == 0) {
+      String message = "Fetcher: No agents listed in 'http.agent.name'"
+          + " property.";
+      if (LOG.isErrorEnabled()) {
+        LOG.error(message);
+      }
+      throw new IllegalArgumentException(message);
+    }
+  }
+
+  private AtomicInteger getActiveThreads() {
+    return activeThreads;
+  }
+
+  @Override
+  public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
+
+    Map<String, Object> results = new HashMap<String, Object>();
+
+    Path segment;
+    if(args.containsKey(Nutch.ARG_SEGMENT)) {
+      Object seg = args.get(Nutch.ARG_SEGMENT);
+      if(seg instanceof Path) {
+        segment = (Path) seg;
+      }
+      else {
+        segment = new Path(seg.toString());
+      }
+    }
+    else {
+      String segment_dir = crawlId+"/segments";
+      File segmentsDir = new File(segment_dir);
+      File[] segmentsList = segmentsDir.listFiles();  
+      Arrays.sort(segmentsList, new Comparator<File>(){
+        @Override
+        public int compare(File f1, File f2) {
+          if(f1.lastModified()>f2.lastModified())
+            return -1;
+          else
+            return 0;
+        }      
+      });
+      segment = new Path(segmentsList[0].getPath());
+    }
+
+
+    int threads = getConf().getInt("fetcher.threads.fetch", 10);
+
+    // parse command line
+    if (args.containsKey("threads")) { // found -threads option
+      threads = Integer.parseInt((String)args.get("threads"));
+    }
+    getConf().setInt("fetcher.threads.fetch", threads);
+
+    try {
+      fetch(segment, threads);
+      results.put(Nutch.VAL_RESULT, Integer.toString(0));
+      return results;
+    } catch (Exception e) {
+      LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
+      results.put(Nutch.VAL_RESULT, Integer.toString(-1));
+      return results;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
new file mode 100644
index 0000000..d526a07
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.MapFile.Writer.Option;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseOutputFormat;
+import org.apache.nutch.protocol.Content;
+
+/** Splits FetcherOutput entries into multiple map files. */
+public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {
+
+  public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
+    Path out = FileOutputFormat.getOutputPath(job);
+    if ((out == null) && (job.getNumReduceTasks() != 0)) {
+      throw new InvalidJobConfException("Output directory not set in JobConf.");
+    }
+    if (fs == null) {
+      fs = out.getFileSystem(job);
+    }
+    if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
+      throw new IOException("Segment already fetched!");
+  }
+
+  public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
+      final JobConf job, final String name, final Progressable progress)
+          throws IOException {
+
+    Path out = FileOutputFormat.getOutputPath(job);
+    final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
+    final Path content = new Path(new Path(out, Content.DIR_NAME), name);
+
+    final CompressionType compType = SequenceFileOutputFormat
+        .getOutputCompressionType(job);
+
+    Option fKeyClassOpt = MapFile.Writer.keyClass(Text.class);
+    org.apache.hadoop.io.SequenceFile.Writer.Option fValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
+    org.apache.hadoop.io.SequenceFile.Writer.Option fProgressOpt = SequenceFile.Writer.progressable(progress);
+    org.apache.hadoop.io.SequenceFile.Writer.Option fCompOpt = SequenceFile.Writer.compression(compType);
+
+    final MapFile.Writer fetchOut = new MapFile.Writer(job,
+        fetch, fKeyClassOpt, fValClassOpt, fCompOpt, fProgressOpt);
+
+    return new RecordWriter<Text, NutchWritable>() {
+      private MapFile.Writer contentOut;
+      private RecordWriter<Text, Parse> parseOut;
+
+      {
+        if (Fetcher.isStoringContent(job)) {
+          Option cKeyClassOpt = MapFile.Writer.keyClass(Text.class);
+          org.apache.hadoop.io.SequenceFile.Writer.Option cValClassOpt = SequenceFile.Writer.valueClass(Content.class);
+          org.apache.hadoop.io.SequenceFile.Writer.Option cProgressOpt = SequenceFile.Writer.progressable(progress);
+          org.apache.hadoop.io.SequenceFile.Writer.Option cCompOpt = SequenceFile.Writer.compression(compType);
+          contentOut = new MapFile.Writer(job, content,
+              cKeyClassOpt, cValClassOpt, cCompOpt, cProgressOpt);
+        }
+
+        if (Fetcher.isParsing(job)) {
+          parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name,
+              progress);
+        }
+      }
+
+      public void write(Text key, NutchWritable value) throws IOException {
+
+        Writable w = value.get();
+
+        if (w instanceof CrawlDatum)
+          fetchOut.append(key, w);
+        else if (w instanceof Content && contentOut != null)
+          contentOut.append(key, w);
+        else if (w instanceof Parse && parseOut != null)
+          parseOut.write(key, (Parse) w);
+      }
+
+      public void close(Reporter reporter) throws IOException {
+        fetchOut.close();
+        if (contentOut != null) {
+          contentOut.close();
+        }
+        if (parseOut != null) {
+          parseOut.close(reporter);
+        }
+      }
+
+    };
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java
new file mode 100644
index 0000000..e57e735
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLExemptionFilters;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseOutputFormat;
+import org.apache.nutch.parse.ParseResult;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.service.NutchServer;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import crawlercommons.robots.BaseRobotRules;
+
+/**
+ * This class picks items from queues and fetches the pages.
+ */
+public class FetcherThread extends Thread {
+  
+  private static final Logger LOG = LoggerFactory.getLogger(FetcherThread.class);
+
+  private Configuration conf;
+  private URLFilters urlFilters;
+  private URLExemptionFilters urlExemptionFilters;
+  private ScoringFilters scfilters;
+  private ParseUtil parseUtil;
+  private URLNormalizers normalizers;
+  private ProtocolFactory protocolFactory;
+  private long maxCrawlDelay;
+  private String queueMode;
+  private int maxRedirect;
+  private String reprUrl;
+  private boolean redirecting;
+  private int redirectCount;
+  private boolean ignoreInternalLinks;
+  private boolean ignoreExternalLinks;
+  private String ignoreExternalLinksMode;
+
+  // Used by fetcher.follow.outlinks.depth in parse
+  private int maxOutlinksPerPage;
+  private final int maxOutlinks;
+  private final int interval;
+  private int maxOutlinkDepth;
+  private int maxOutlinkDepthNumLinks;
+  private boolean outlinksIgnoreExternal;
+
+  private int outlinksDepthDivisor;
+  private boolean skipTruncated;
+
+  private boolean halted = false;
+
+  private AtomicInteger activeThreads;
+
+  private Object fetchQueues;
+
+  private QueueFeeder feeder;
+
+  private Object spinWaiting;
+
+  private AtomicLong lastRequestStart;
+
+  private Reporter reporter;
+
+  private AtomicInteger errors;
+
+  private String segmentName;
+
+  private boolean parsing;
+
+  private OutputCollector<Text, NutchWritable> output;
+
+  private boolean storingContent;
+
+  private AtomicInteger pages;
+
+  private AtomicLong bytes;
+  
+  //Used by the REST service
+  private FetchNode fetchNode;
+  private boolean reportToNutchServer;
+
+  public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQueues fetchQueues, 
+      QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong lastRequestStart, Reporter reporter,
+      AtomicInteger errors, String segmentName, boolean parsing, OutputCollector<Text, NutchWritable> output,
+      boolean storingContent, AtomicInteger pages, AtomicLong bytes) {
+    this.setDaemon(true); // don't hang JVM on exit
+    this.setName("FetcherThread"); // use an informative name
+    this.conf = conf;
+    this.urlFilters = new URLFilters(conf);
+    this.urlExemptionFilters = new URLExemptionFilters(conf);
+    this.scfilters = new ScoringFilters(conf);
+    this.parseUtil = new ParseUtil(conf);
+    this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true);
+    this.protocolFactory = new ProtocolFactory(conf);
+    this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+    this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+    this.activeThreads = activeThreads;
+    this.fetchQueues = fetchQueues;
+    this.feeder = feeder;
+    this.spinWaiting = spinWaiting;
+    this.lastRequestStart = lastRequestStart;
+    this.reporter = reporter;
+    this.errors = errors;
+    this.segmentName = segmentName;
+    this.parsing = parsing;
+    this.output = output;
+    this.storingContent = storingContent;
+    this.pages = pages;
+    this.bytes = bytes;
+    queueMode = conf.get("fetcher.queue.mode",
+        FetchItemQueues.QUEUE_MODE_HOST);
+    // check that the mode is known
+    if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
+        && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
+        && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + queueMode
+          + " - forcing to byHost");
+      queueMode = FetchItemQueues.QUEUE_MODE_HOST;
+    }
+    LOG.info("Using queue mode : " + queueMode);
+    this.maxRedirect = conf.getInt("http.redirect.max", 3);
+
+    maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
+    maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
+        : maxOutlinksPerPage;
+    interval = conf.getInt("db.fetch.interval.default", 2592000);
+    ignoreInternalLinks = conf.getBoolean("db.ignore.internal.links", false);
+    ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
+    ignoreExternalLinksMode = conf.get("db.ignore.external.links.mode", "byHost");
+    maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
+    outlinksIgnoreExternal = conf.getBoolean(
+        "fetcher.follow.outlinks.ignore.external", false);
+    maxOutlinkDepthNumLinks = conf.getInt(
+        "fetcher.follow.outlinks.num.links", 4);
+    outlinksDepthDivisor = conf.getInt(
+        "fetcher.follow.outlinks.depth.divisor", 2);
+  }
+
+  @SuppressWarnings("fallthrough")
+  public void run() {
+    activeThreads.incrementAndGet(); // count threads
+
+    FetchItem fit = null;
+    try {
+      // checking for the server to be running and fetcher.parse to be true
+      if (parsing && NutchServer.getInstance().isRunning())
+        reportToNutchServer = true;
+      
+      while (true) {
+        // creating FetchNode for storing in FetchNodeDb
+        if (reportToNutchServer)
+          this.fetchNode = new FetchNode();
+        else
+          this.fetchNode = null;
+
+        // check whether must be stopped
+        if (isHalted()) {
+          LOG.debug(getName() + " set to halted");
+          fit = null;
+          return;
+        }
+
+        fit = ((FetchItemQueues) fetchQueues).getFetchItem();
+        if (fit == null) {
+          if (feeder.isAlive() || ((FetchItemQueues) fetchQueues).getTotalSize() > 0) {
+            LOG.debug(getName() + " spin-waiting ...");
+            // spin-wait.
+            ((AtomicInteger) spinWaiting).incrementAndGet();
+            try {
+              Thread.sleep(500);
+            } catch (Exception e) {
+            }
+            ((AtomicInteger) spinWaiting).decrementAndGet();
+            continue;
+          } else {
+            // all done, finish this thread
+            LOG.info("Thread " + getName() + " has no more work available");
+            return;
+          }
+        }
+        lastRequestStart.set(System.currentTimeMillis());
+        Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
+            Nutch.WRITABLE_REPR_URL_KEY);
+        if (reprUrlWritable == null) {
+          setReprUrl(fit.url.toString());
+        } else {
+          setReprUrl(reprUrlWritable.toString());
+        }
+        try {
+          // fetch the page
+          redirecting = false;
+          redirectCount = 0;
+          do {
+            if (LOG.isInfoEnabled()) {
+              LOG.info("fetching " + fit.url + " (queue crawl delay="
+                  + ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay
+                  + "ms)");
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("redirectCount=" + redirectCount);
+            }
+            redirecting = false;
+            Protocol protocol = this.protocolFactory.getProtocol(fit.url
+                .toString());
+            BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
+            if (!rules.isAllowed(fit.u.toString())) {
+              // unblock
+              ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Denied by robots.txt: " + fit.url);
+              }
+              output(fit.url, fit.datum, null,
+                  ProtocolStatus.STATUS_ROBOTS_DENIED,
+                  CrawlDatum.STATUS_FETCH_GONE);
+              reporter.incrCounter("FetcherStatus", "robots_denied", 1);
+              continue;
+            }
+            if (rules.getCrawlDelay() > 0) {
+              if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
+                // unblock
+                ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+                LOG.debug("Crawl-Delay for " + fit.url + " too long ("
+                    + rules.getCrawlDelay() + "), skipping");
+                output(fit.url, fit.datum, null,
+                    ProtocolStatus.STATUS_ROBOTS_DENIED,
+                    CrawlDatum.STATUS_FETCH_GONE);
+                reporter.incrCounter("FetcherStatus",
+                    "robots_denied_maxcrawldelay", 1);
+                continue;
+              } else {
+                FetchItemQueue fiq = ((FetchItemQueues) fetchQueues)
+                    .getFetchItemQueue(fit.queueID);
+                fiq.crawlDelay = rules.getCrawlDelay();
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Crawl delay for queue: " + fit.queueID
+                      + " is set to " + fiq.crawlDelay
+                      + " as per robots.txt. url: " + fit.url);
+                }
+              }
+            }
+            ProtocolOutput output = protocol.getProtocolOutput(fit.url,
+                fit.datum);
+            ProtocolStatus status = output.getStatus();
+            Content content = output.getContent();
+            ParseStatus pstatus = null;
+            // unblock queue
+            ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+
+            String urlString = fit.url.toString();
+            
+            // used for FetchNode
+            if (fetchNode != null) {
+              fetchNode.setStatus(status.getCode());
+              fetchNode.setFetchTime(System.currentTimeMillis());
+              fetchNode.setUrl(fit.url);
+            }
+
+            reporter.incrCounter("FetcherStatus", status.getName(), 1);
+
+            switch (status.getCode()) {
+
+            case ProtocolStatus.WOULDBLOCK:
+              // retry ?
+              ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+              break;
+
+            case ProtocolStatus.SUCCESS: // got a page
+              pstatus = output(fit.url, fit.datum, content, status,
+                  CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
+              updateStatus(content.getContent().length);
+              if (pstatus != null && pstatus.isSuccess()
+                  && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+                String newUrl = pstatus.getMessage();
+                int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
+                Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+                    newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
+                    Fetcher.CONTENT_REDIR);
+                if (redirUrl != null) {
+                  fit = queueRedirect(redirUrl, fit);
+                }
+              }
+              break;
+
+            case ProtocolStatus.MOVED: // redirect
+            case ProtocolStatus.TEMP_MOVED:
+              int code;
+              boolean temp;
+              if (status.getCode() == ProtocolStatus.MOVED) {
+                code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+                temp = false;
+              } else {
+                code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+                temp = true;
+              }
+              output(fit.url, fit.datum, content, status, code);
+              String newUrl = status.getMessage();
+              Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+                  newUrl, temp, Fetcher.PROTOCOL_REDIR);
+              if (redirUrl != null) {
+                fit = queueRedirect(redirUrl, fit);
+              } else {
+                // stop redirecting
+                redirecting = false;
+              }
+              break;
+
+            case ProtocolStatus.EXCEPTION:
+              logError(fit.url, status.getMessage());
+              int killedURLs = ((FetchItemQueues) fetchQueues).checkExceptionThreshold(fit
+                  .getQueueID());
+              if (killedURLs != 0)
+                reporter.incrCounter("FetcherStatus",
+                    "AboveExceptionThresholdInQueue", killedURLs);
+              /* FALLTHROUGH */
+            case ProtocolStatus.RETRY: // retry
+            case ProtocolStatus.BLOCKED:
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_RETRY);
+              break;
+
+            case ProtocolStatus.GONE: // gone
+            case ProtocolStatus.NOTFOUND:
+            case ProtocolStatus.ACCESS_DENIED:
+            case ProtocolStatus.ROBOTS_DENIED:
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_GONE);
+              break;
+
+            case ProtocolStatus.NOTMODIFIED:
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+              break;
+
+            default:
+              if (LOG.isWarnEnabled()) {
+                LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+              }
+              output(fit.url, fit.datum, null, status,
+                  CrawlDatum.STATUS_FETCH_RETRY);
+            }
+
+            if (redirecting && redirectCount > maxRedirect) {
+              ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+              if (LOG.isInfoEnabled()) {
+                LOG.info(" - redirect count exceeded " + fit.url);
+              }
+              output(fit.url, fit.datum, null,
+                  ProtocolStatus.STATUS_REDIR_EXCEEDED,
+                  CrawlDatum.STATUS_FETCH_GONE);
+            }
+
+          } while (redirecting && (redirectCount <= maxRedirect));
+
+        } catch (Throwable t) { // unexpected exception
+          // unblock
+          ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+          logError(fit.url, StringUtils.stringifyException(t));
+          output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
+              CrawlDatum.STATUS_FETCH_RETRY);
+        }
+      }
+
+    } catch (Throwable e) {
+      if (LOG.isErrorEnabled()) {
+        LOG.error("fetcher caught:" + e.toString());
+      }
+    } finally {
+      if (fit != null)
+        ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+      activeThreads.decrementAndGet(); // count threads
+      LOG.info("-finishing thread " + getName() + ", activeThreads="
+          + activeThreads);
+    }
+  }
+
+  private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+      String newUrl, boolean temp, String redirType)
+      throws MalformedURLException, URLFilterException {
+    newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+    newUrl = urlFilters.filter(newUrl);
+
+    try {
+      String origHost = new URL(urlString).getHost().toLowerCase();
+      String newHost = new URL(newUrl).getHost().toLowerCase();
+      if (ignoreExternalLinks) {
+        if (!origHost.equals(newHost)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" - ignoring redirect " + redirType + " from "
+                + urlString + " to " + newUrl
+                + " because external links are ignored");
+          }
+          return null;
+        }
+      }
+      
+      if (ignoreInternalLinks) {
+        if (origHost.equals(newHost)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(" - ignoring redirect " + redirType + " from "
+                + urlString + " to " + newUrl
+                + " because internal links are ignored");
+          }
+          return null;
+        }
+      }
+    } catch (MalformedURLException e) { }
+    
+    if (newUrl != null && !newUrl.equals(urlString)) {
+      reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+      url = new Text(newUrl);
+      if (maxRedirect > 0) {
+        redirecting = true;
+        redirectCount++;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" - " + redirType + " redirect to " + url
+              + " (fetching now)");
+        }
+        return url;
+      } else {
+        CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
+            datum.getFetchInterval(), datum.getScore());
+        // transfer existing metadata
+        newDatum.getMetaData().putAll(datum.getMetaData());
+        try {
+          scfilters.initialScore(url, newDatum);
+        } catch (ScoringFilterException e) {
+          e.printStackTrace();
+        }
+        if (reprUrl != null) {
+          newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+              new Text(reprUrl));
+        }
+        output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" - " + redirType + " redirect to " + url
+              + " (fetching later)");
+        }
+        return null;
+      }
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" - " + redirType + " redirect skipped: "
+            + (newUrl != null ? "to same url" : "filtered"));
+      }
+      return null;
+    }
+  }
+
+  private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
+      throws ScoringFilterException {
+    CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
+        fit.datum.getFetchInterval(), fit.datum.getScore());
+    // transfer all existing metadata to the redirect
+    newDatum.getMetaData().putAll(fit.datum.getMetaData());
+    scfilters.initialScore(redirUrl, newDatum);
+    if (reprUrl != null) {
+      newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+          new Text(reprUrl));
+    }
+    fit = FetchItem.create(redirUrl, newDatum, queueMode);
+    if (fit != null) {
+      FetchItemQueue fiq = ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID);
+      fiq.addInProgressFetchItem(fit);
+    } else {
+      // stop redirecting
+      redirecting = false;
+      reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
+          1);
+    }
+    return fit;
+  }
+
+  private void logError(Text url, String message) {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("fetch of " + url + " failed with: " + message);
+    }
+    errors.incrementAndGet();
+  }
+
+  private ParseStatus output(Text key, CrawlDatum datum, Content content,
+      ProtocolStatus pstatus, int status) {
+
+    return output(key, datum, content, pstatus, status, 0);
+  }
+
+  private ParseStatus output(Text key, CrawlDatum datum, Content content,
+      ProtocolStatus pstatus, int status, int outlinkDepth) {
+
+    datum.setStatus(status);
+    datum.setFetchTime(System.currentTimeMillis());
+    if (pstatus != null)
+      datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
+    ParseResult parseResult = null;
+    if (content != null) {
+      Metadata metadata = content.getMetadata();
+
+      // store the guessed content type in the crawldatum
+      if (content.getContentType() != null)
+        datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+            new Text(content.getContentType()));
+
+      // add segment to metadata
+      metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+      // add score to content metadata so that ParseSegment can pick it up.
+      try {
+        scfilters.passScoreBeforeParsing(key, datum, content);
+      } catch (Exception e) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+        }
+      }
+      /*
+       * Note: Fetcher will only follow meta-redirects coming from the
+       * original URL.
+       */
+      if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+        if (!skipTruncated
+            || (skipTruncated && !ParseSegment.isTruncated(content))) {
+          try {
+            parseResult = this.parseUtil.parse(content);
+          } catch (Exception e) {
+            LOG.warn("Error parsing: " + key + ": "
+                + StringUtils.stringifyException(e));
+          }
+        }
+
+        if (parseResult == null) {
+          byte[] signature = SignatureFactory.getSignature(conf)
+              .calculate(content, new ParseStatus().getEmptyParse(conf));
+          datum.setSignature(signature);
+        }
+      }
+
+      /*
+       * Store status code in content So we can read this value during parsing
+       * (as a separate job) and decide to parse or not.
+       */
+      content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+          Integer.toString(status));
+    }
+
+    try {
+      output.collect(key, new NutchWritable(datum));
+      if (content != null && storingContent)
+        output.collect(key, new NutchWritable(content));
+      if (parseResult != null) {
+        for (Entry<Text, Parse> entry : parseResult) {
+          Text url = entry.getKey();
+          Parse parse = entry.getValue();
+          ParseStatus parseStatus = parse.getData().getStatus();
+          ParseData parseData = parse.getData();
+
+          if (!parseStatus.isSuccess()) {
+            LOG.warn("Error parsing: " + key + ": " + parseStatus);
+            parse = parseStatus.getEmptyParse(conf);
+          }
+
+          // Calculate page signature. For non-parsing fetchers this will
+          // be done in ParseSegment
+          byte[] signature = SignatureFactory.getSignature(conf)
+              .calculate(content, parse);
+          // Ensure segment name and score are in parseData metadata
+          parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
+          parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
+              StringUtil.toHexString(signature));
+          // Pass fetch time to content meta
+          parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
+              Long.toString(datum.getFetchTime()));
+          if (url.equals(key))
+            datum.setSignature(signature);
+          try {
+            scfilters.passScoreAfterParsing(url, content, parse);
+          } catch (Exception e) {
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+            }
+          }
+
+          String origin = null;
+
+          // collect outlinks for subsequent db update
+          Outlink[] links = parseData.getOutlinks();
+          int outlinksToStore = Math.min(maxOutlinks, links.length);
+          if (ignoreExternalLinks || ignoreInternalLinks) {
+            URL originURL = new URL(url.toString());
+            // based on domain?
+            if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) {
+              origin = URLUtil.getDomainName(originURL).toLowerCase();
+            } 
+            // use host 
+            else {
+              origin = originURL.getHost().toLowerCase();
+            }
+          }
+          
+          //used by fetchNode         
+          if(fetchNode!=null){
+            fetchNode.setOutlinks(links);
+            fetchNode.setTitle(parseData.getTitle());
+            FetchNodeDb.getInstance().put(fetchNode.getUrl().toString(), fetchNode);
+          }
+          int validCount = 0;
+
+          // Process all outlinks, normalize, filter and deduplicate
+          List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore);
+          HashSet<String> outlinks = new HashSet<String>(outlinksToStore);
+          for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {
+            String toUrl = links[i].getToUrl();
+
+            toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
+                origin, ignoreInternalLinks, ignoreExternalLinks, ignoreExternalLinksMode,
+                    urlFilters, urlExemptionFilters,  normalizers);
+            if (toUrl == null) {
+              continue;
+            }
+
+            validCount++;
+            links[i].setUrl(toUrl);
+            outlinkList.add(links[i]);
+            outlinks.add(toUrl);
+          }
+
+          // Only process depth N outlinks
+          if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
+            reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
+                outlinks.size());
+
+            // Counter to limit num outlinks to follow per page
+            int outlinkCounter = 0;
+
+            // Calculate variable number of outlinks by depth using the
+            // divisor (outlinks = Math.floor(divisor / depth * num.links))
+            int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
+                / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
+
+            String followUrl;
+
+            // Walk over the outlinks and add as new FetchItem to the queues
+            Iterator<String> iter = outlinks.iterator();
+            while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
+              followUrl = iter.next();
+
+              // Check whether we'll follow external outlinks
+              if (outlinksIgnoreExternal) {
+                if (!URLUtil.getHost(url.toString()).equals(
+                    URLUtil.getHost(followUrl))) {
+                  continue;
+                }
+              }
+
+              reporter
+                  .incrCounter("FetcherOutlinks", "outlinks_following", 1);
+
+              // Create new FetchItem with depth incremented
+              FetchItem fit = FetchItem.create(new Text(followUrl),
+                  new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
+                  queueMode, outlinkDepth + 1);
+              ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+
+              outlinkCounter++;
+            }
+          }
+
+          // Overwrite the outlinks in ParseData with the normalized and
+          // filtered set
+          parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
+              .size()]));
+
+          output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+              parse.getText()), parseData, parse.isCanonical())));
+        }
+      }
+    } catch (IOException e) {
+      if (LOG.isErrorEnabled()) {
+        LOG.error("fetcher caught:" + e.toString());
+      }
+    }
+
+    // return parse status if it exits
+    if (parseResult != null && !parseResult.isEmpty()) {
+      Parse p = parseResult.get(content.getUrl());
+      if (p != null) {
+        reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
+            .getData().getStatus().getMajorCode()], 1);
+        return p.getData().getStatus();
+      }
+    }
+    return null;
+  }
+  
+  private void updateStatus(int bytesInPage) throws IOException {
+    pages.incrementAndGet();
+    bytes.addAndGet(bytesInPage);
+  }
+
+  public synchronized void setHalted(boolean halted) {
+    this.halted = halted;
+  }
+
+  public synchronized boolean isHalted() {
+    return halted;
+  }
+
+  public String getReprUrl() {
+    return reprUrl;
+  }
+  
+  private void setReprUrl(String urlString) {
+    this.reprUrl = urlString;
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java
new file mode 100644
index 0000000..79652e7
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class feeds the queues with input items, and re-fills them as items
+ * are consumed by FetcherThread-s.
+ */
+public class QueueFeeder extends Thread {
+  
+  private static final Logger LOG = LoggerFactory.getLogger(QueueFeeder.class);
+
+  
+  private RecordReader<Text, CrawlDatum> reader;
+  private FetchItemQueues queues;
+  private int size;
+  private long timelimit = -1;
+
+  public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
+      FetchItemQueues queues, int size) {
+    this.reader = reader;
+    this.queues = queues;
+    this.size = size;
+    this.setDaemon(true);
+    this.setName("QueueFeeder");
+  }
+
+  public void setTimeLimit(long tl) {
+    timelimit = tl;
+  }
+
+  public void run() {
+    boolean hasMore = true;
+    int cnt = 0;
+    int timelimitcount = 0;
+    while (hasMore) {
+      if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+        // enough .. lets' simply
+        // read all the entries from the input without processing them
+        try {
+          Text url = new Text();
+          CrawlDatum datum = new CrawlDatum();
+          hasMore = reader.next(url, datum);
+          timelimitcount++;
+        } catch (IOException e) {
+          LOG.error("QueueFeeder error reading input, record " + cnt, e);
+          return;
+        }
+        continue;
+      }
+      int feed = size - queues.getTotalSize();
+      if (feed <= 0) {
+        // queues are full - spin-wait until they have some free space
+        try {
+          Thread.sleep(1000);
+        } catch (Exception e) {
+        }
+        ;
+        continue;
+      } else {
+        LOG.debug("-feeding " + feed + " input urls ...");
+        while (feed > 0 && hasMore) {
+          try {
+            Text url = new Text();
+            CrawlDatum datum = new CrawlDatum();
+            hasMore = reader.next(url, datum);
+            if (hasMore) {
+              queues.addFetchItem(url, datum);
+              cnt++;
+              feed--;
+            }
+          } catch (IOException e) {
+            LOG.error("QueueFeeder error reading input, record " + cnt, e);
+            return;
+          }
+        }
+      }
+    }
+    LOG.info("QueueFeeder finished: total " + cnt
+        + " records + hit by time limit :" + timelimitcount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html b/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html
new file mode 100644
index 0000000..9c843e0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html
@@ -0,0 +1,5 @@
+<html>
+<body>
+The Nutch robot.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java
new file mode 100644
index 0000000..424fb1e
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ */
+public class HostDatum implements Writable, Cloneable {
+  protected int failures = 0;
+  protected float score = 0;
+  protected Date lastCheck = new Date(0);
+  protected String homepageUrl = new String();
+
+  protected MapWritable metaData = new MapWritable();
+
+  // Records the number of times DNS look-up failed, may indicate host no longer exists
+  protected int dnsFailures = 0;
+
+  // Records the number of connection failures, may indicate our netwerk being blocked by firewall
+  protected int connectionFailures = 0;
+
+  protected int unfetched = 0;
+  protected int fetched = 0;
+  protected int notModified = 0;
+  protected int redirTemp = 0;
+  protected int redirPerm = 0;
+  protected int gone = 0;
+
+  public HostDatum() {
+  }
+
+  public HostDatum(float score) {
+    this(score, new Date());
+  }
+
+  public HostDatum(float score, Date lastCheck) {
+    this(score, lastCheck, new String());
+  }
+
+  public HostDatum(float score, Date lastCheck, String homepageUrl) {
+    this.score =  score;
+    this.lastCheck = lastCheck;
+    this.homepageUrl = homepageUrl;
+  }
+
+  public void resetFailures() {
+    setDnsFailures(0);
+    setConnectionFailures(0);
+  }
+
+  public void setDnsFailures(Integer dnsFailures) {
+    this.dnsFailures = dnsFailures;
+  }
+
+  public void setConnectionFailures(Integer connectionFailures) {
+    this.connectionFailures = connectionFailures;
+  }
+
+  public void incDnsFailures() {
+    this.dnsFailures++;
+  }
+
+  public void incConnectionFailures() {
+    this.connectionFailures++;
+  }
+
+  public Integer numFailures() {
+    return getDnsFailures() + getConnectionFailures();
+  }
+
+  public Integer getDnsFailures() {
+    return dnsFailures;
+  }
+
+  public Integer getConnectionFailures() {
+    return connectionFailures;
+  }
+
+  public void setScore(float score) {
+    this.score = score;
+  }
+
+  public void setLastCheck() {
+    setLastCheck(new Date());
+  }
+
+  public void setLastCheck(Date date) {
+    lastCheck = date;
+  }
+
+  public boolean isEmpty() {
+    return (lastCheck.getTime() == 0) ? true : false;
+  }
+
+  public float getScore() {
+    return score;
+  }
+
+  public Integer numRecords() {
+    return unfetched + fetched + gone + redirPerm + redirTemp + notModified;
+  }
+
+  public Date getLastCheck() {
+    return lastCheck;
+  }
+
+  public boolean hasHomepageUrl() {
+    return homepageUrl.length() > 0;
+  }
+
+  public String getHomepageUrl() {
+    return homepageUrl;
+  }
+
+  public void setHomepageUrl(String homepageUrl) {
+    this.homepageUrl = homepageUrl;
+  }
+
+  public void setUnfetched(int val) {
+    unfetched = val;
+  }
+
+  public int getUnfetched() {
+    return unfetched;
+  }
+
+  public void setFetched(int val) {
+    fetched = val;
+  }
+
+  public int getFetched() {
+    return fetched;
+  }
+
+  public void setNotModified(int val) {
+    notModified = val;
+  }
+
+  public int getNotModified() {
+    return notModified;
+  }
+
+  public void setRedirTemp(int val) {
+    redirTemp = val;
+  }
+
+  public int getRedirTemp() {
+    return redirTemp;
+  }
+
+  public void setRedirPerm(int val) {
+    redirPerm = val;
+  }
+
+  public int getRedirPerm() {
+    return redirPerm;
+  }
+
+  public void setGone(int val) {
+    gone = val;
+  }
+
+  public int getGone() {
+    return gone;
+  }
+
+  public void resetStatistics() {
+    setUnfetched(0);
+    setFetched(0);
+    setGone(0);
+    setRedirTemp(0);
+    setRedirPerm(0);
+    setNotModified(0);
+  }
+
+   public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) {
+     this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable);
+   }
+
+   /**
+    * Add all metadata from other CrawlDatum to this CrawlDatum.
+    *
+    * @param other HostDatum
+    */
+   public void putAllMetaData(HostDatum other) {
+     for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) {
+       getMetaData().put(e.getKey(), e.getValue());
+     }
+   }
+
+  /**
+   * returns a MapWritable if it was set or read in @see readFields(DataInput),
+   * returns empty map in case CrawlDatum was freshly created (lazily instantiated).
+   */
+  public org.apache.hadoop.io.MapWritable getMetaData() {
+    if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable();
+    return this.metaData;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    HostDatum result = (HostDatum)super.clone();
+    result.score = score;
+    result.lastCheck = lastCheck;
+    result.homepageUrl = homepageUrl;
+
+    result.dnsFailures = dnsFailures;
+    result.connectionFailures = connectionFailures;
+
+    result.unfetched = unfetched;
+    result.fetched = fetched;
+    result.notModified = notModified;
+    result.redirTemp = redirTemp;
+    result.redirPerm = redirPerm;
+    result.gone = gone;
+
+    result.metaData = metaData;
+
+    return result;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    score = in.readFloat();
+    lastCheck = new Date(in.readLong());
+    homepageUrl = Text.readString(in);
+
+    dnsFailures = in.readInt();
+    connectionFailures = in.readInt();
+
+    unfetched= in.readInt();
+    fetched= in.readInt();
+    notModified= in.readInt();
+    redirTemp= in.readInt();
+    redirPerm = in.readInt();
+    gone = in.readInt();
+
+    metaData = new org.apache.hadoop.io.MapWritable();
+    metaData.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(score);
+    out.writeLong(lastCheck.getTime());
+    Text.writeString(out, homepageUrl);
+
+    out.writeInt(dnsFailures);
+    out.writeInt(connectionFailures);
+
+    out.writeInt(unfetched);
+    out.writeInt(fetched);
+    out.writeInt(notModified);
+    out.writeInt(redirTemp);
+    out.writeInt(redirPerm);
+    out.writeInt(gone);
+
+    metaData.write(out);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(Integer.toString(getUnfetched()));
+    buf.append("\t");
+    buf.append(Integer.toString(getFetched()));
+    buf.append("\t");
+    buf.append(Integer.toString(getGone()));
+    buf.append("\t");
+    buf.append(Integer.toString(getRedirTemp()));
+    buf.append("\t");
+    buf.append(Integer.toString(getRedirPerm()));
+    buf.append("\t");
+    buf.append(Integer.toString(getNotModified()));
+    buf.append("\t");
+    buf.append(Integer.toString(numRecords()));
+    buf.append("\t");
+    buf.append(Integer.toString(getDnsFailures()));
+    buf.append("\t");
+    buf.append(Integer.toString(getConnectionFailures()));
+    buf.append("\t");
+    buf.append(Integer.toString(numFailures()));
+    buf.append("\t");
+    buf.append(Float.toString(score));
+    buf.append("\t");
+    buf.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck));
+    buf.append("\t");
+    buf.append(homepageUrl);
+    buf.append("\t");
+    for (Entry<Writable, Writable> e : getMetaData().entrySet()) {
+      buf.append(e.getKey().toString());
+      buf.append(':');
+      buf.append(e.getValue().toString());
+      buf.append("|||");
+    }
+    return buf.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java
new file mode 100644
index 0000000..240e109
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+
+/**
+ * @see http://commons.apache.org/proper/commons-jexl/reference/syntax.html
+ */
+public class ReadHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ReadHostDb.class);
+
+  public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames";
+  public static final String HOSTDB_DUMP_HOMEPAGES = "hostdb.dump.homepages";
+  public static final String HOSTDB_FILTER_EXPRESSION = "hostdb.filter.expression";
+
+  static class ReadHostDbMapper extends Mapper<Text, HostDatum, Text, Text> {
+    protected boolean dumpHostnames = false;
+    protected boolean dumpHomepages = false;
+    protected Text emptyText = new Text();
+    protected Expression expr = null;
+
+    public void setup(Context context) {
+      dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false);
+      dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false);
+      String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION);
+      if (expr != null) {
+        // Create or retrieve a JexlEngine
+        JexlEngine jexl = new JexlEngine();
+        
+        // Dont't be silent and be strict
+        jexl.setSilent(true);
+        jexl.setStrict(true);
+        
+        // Create an expression object
+        this.expr = jexl.createExpression(expr);
+      }
+    }
+
+    public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException {     
+      if (expr != null) {
+        // Create a context and add data
+        JexlContext jcontext = new MapContext();
+        
+        // Set some fixed variables
+        jcontext.set("unfetched", datum.getUnfetched());
+        jcontext.set("fetched", datum.getFetched());
+        jcontext.set("gone", datum.getGone());
+        jcontext.set("redirTemp", datum.getRedirTemp());
+        jcontext.set("redirPerm", datum.getRedirPerm());
+        jcontext.set("redirs", datum.getRedirPerm() + datum.getRedirTemp());
+        jcontext.set("notModified", datum.getNotModified());
+        jcontext.set("ok", datum.getFetched() + datum.getNotModified());
+        jcontext.set("numRecords", datum.numRecords());
+        jcontext.set("dnsFailures", datum.getDnsFailures());
+        jcontext.set("connectionFailures", datum.getConnectionFailures());
+        
+        // Set metadata variables
+        for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
+          Object value = entry.getValue();
+          
+          if (value instanceof FloatWritable) {
+            FloatWritable fvalue = (FloatWritable)value;
+            Text tkey = (Text)entry.getKey();
+            jcontext.set(tkey.toString(), fvalue.get());
+          }
+          
+          if (value instanceof IntWritable) {
+            IntWritable ivalue = (IntWritable)value;
+            Text tkey = (Text)entry.getKey();
+            jcontext.set(tkey.toString(), ivalue.get());
+          }
+        }
+        
+        // Filter this record if evaluation did not pass
+        try {
+          if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) {
+            return;
+          }
+        } catch (Exception e) {
+          LOG.info(e.toString() + " for " + key.toString());
+        }
+      }
+      
+      if (dumpHomepages) {
+        if (datum.hasHomepageUrl()) {
+          context.write(new Text(datum.getHomepageUrl()), emptyText);
+        }
+        return;
+      }
+      
+      if (dumpHostnames) {
+        context.write(key, emptyText);
+        return;
+      }
+      
+      // Write anyway
+      context.write(key, new Text(datum.toString()));
+    }
+  }
+
+  // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration
+  // host_a.example.org,host_a.example.org ==> example.org
+//   static class ReadHostDbReducer extends Reduce<Text, Text, Text, Text> {
+//     public void setup(Context context) { }
+//
+//     public void reduce(Text domain, Iterable<Text> hosts, Context context) throws IOException, InterruptedException {
+//
+//     }
+//   }
+
+  private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("ReadHostDb: starting at " + sdf.format(start));
+
+    Configuration conf = getConf();
+    conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages);
+    conf.setBoolean(HOSTDB_DUMP_HOSTNAMES, dumpHostnames);
+    if (expr != null) {
+      conf.set(HOSTDB_FILTER_EXPRESSION, expr);
+    }
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    conf.set("mapred.textoutputformat.separator", "\t");
+    
+    Job job = new Job(conf, "ReadHostDb");
+    job.setJarByClass(ReadHostDb.class);
+
+    FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(ReadHostDb.class);
+    job.setMapperClass(ReadHostDbMapper.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(0);
+
+    try {
+      job.waitForCompletion(true);
+    } catch (Exception e) {
+      throw e;
+    }
+
+    long end = System.currentTimeMillis();
+    LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: ReadHostDb <hostdb> <output> [-dumpHomepages | -dumpHostnames | -expr <expr.>]");
+      return -1;
+    }
+
+    boolean dumpHomepages = false;
+    boolean dumpHostnames = false;
+    String expr = null;
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-dumpHomepages")) {
+        LOG.info("ReadHostDb: dumping homepage URL's");
+        dumpHomepages = true;
+      }
+      if (args[i].equals("-dumpHostnames")) {
+        LOG.info("ReadHostDb: dumping hostnames");
+        dumpHostnames = true;
+      }
+      if (args[i].equals("-expr")) {
+        expr = args[i + 1];
+        LOG.info("ReadHostDb: evaluating expression: " + expr);
+        i++;
+      }
+    }
+
+    try {
+      readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("ReadHostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java
new file mode 100644
index 0000000..e7c7978
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple runnable that performs DNS lookup for a single host.
+ */
+public class ResolverThread implements Runnable {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ResolverThread.class);
+
+  protected String host = null;
+  protected HostDatum datum = null;
+  protected Text hostText = new Text();
+  protected OutputCollector<Text,HostDatum> output;
+  protected Reporter reporter;
+  protected int purgeFailedHostsThreshold;
+
+  /**
+   * Constructor.
+   */
+  public ResolverThread(String host, HostDatum datum,
+    OutputCollector<Text,HostDatum> output, Reporter reporter, int purgeFailedHostsThreshold) {
+
+    hostText.set(host);
+    this.host = host;
+    this.datum = datum;
+    this.output = output;
+    this.reporter = reporter;
+    this.purgeFailedHostsThreshold = purgeFailedHostsThreshold;
+  }
+
+  /**
+   *
+   */
+  public void run() {
+    // Resolve the host and act appropriatly
+    try {
+      // Throws an exception if host is not found
+      InetAddress inetAddr = InetAddress.getByName(host);
+
+      if (datum.isEmpty()) {
+        reporter.incrCounter("UpdateHostDb", "new_known_host" ,1);
+        datum.setLastCheck();
+        LOG.info(host + ": new_known_host " + datum);
+      } else if (datum.getDnsFailures() > 0) {
+        reporter.incrCounter("UpdateHostDb", "rediscovered_host" ,1);
+        datum.setLastCheck();
+        datum.setDnsFailures(0);
+        LOG.info(host + ": rediscovered_host " + datum);
+      } else {
+        reporter.incrCounter("UpdateHostDb", "existing_known_host", 1);
+        datum.setLastCheck();
+        LOG.info(host + ": existing_known_host " + datum);
+      }
+
+      // Write the host datum
+      output.collect(hostText, datum);
+    } catch (UnknownHostException e) {
+      try {
+        // If the counter is empty we'll initialize with date = today and 1 failure
+        if (datum.isEmpty()) {
+          datum.setLastCheck();
+          datum.setDnsFailures(1);
+          output.collect(hostText, datum);
+          reporter.incrCounter("UpdateHostDb", "new_unknown_host", 1);
+          LOG.info(host + ": new_unknown_host " + datum);
+        } else {
+          datum.setLastCheck();
+          datum.incDnsFailures();
+
+          // Check if this host should be forgotten
+          if (purgeFailedHostsThreshold == -1 ||
+            purgeFailedHostsThreshold < datum.getDnsFailures()) {
+
+            output.collect(hostText, datum);
+            reporter.incrCounter("UpdateHostDb", "existing_unknown_host" ,1);
+            LOG.info(host + ": existing_unknown_host " + datum);
+          } else {
+            reporter.incrCounter("UpdateHostDb", "purged_unknown_host" ,1);
+            LOG.info(host + ": purged_unknown_host " + datum);
+          }
+        }
+
+        reporter.incrCounter("UpdateHostDb",
+          Integer.toString(datum.numFailures()) + "_times_failed", 1);
+      } catch (Exception ioe) {
+        LOG.warn(StringUtils.stringifyException(ioe));
+      }
+    } catch (Exception e) {
+      LOG.warn(StringUtils.stringifyException(e));
+    }
+    
+    reporter.incrCounter("UpdateHostDb", "checked_hosts", 1);
+  }
+}
\ No newline at end of file