You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:49:03 UTC

[47/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java
new file mode 100644
index 0000000..39923ac
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * This tool merges several LinkDb-s into one, optionally filtering URLs through
+ * the current URLFilters, to skip prohibited URLs and links.
+ * 
+ * <p>
+ * It's possible to use this tool just for filtering - in that case only one
+ * LinkDb should be specified in arguments.
+ * </p>
+ * <p>
+ * If more than one LinkDb contains information about the same URL, all inlinks
+ * are accumulated, but only at most <code>linkdb.max.inlinks</code> inlinks will
+ * ever be added.
+ * </p>
+ * <p>
+ * If activated, URLFilters will be applied to both the target URLs and to any
+ * incoming link URL. If a target URL is prohibited, all inlinks to that target
+ * will be removed, including the target URL. If some of incoming links are
+ * prohibited, only they will be removed, and they won't count when checking the
+ * above-mentioned maximum limit.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class LinkDbMerger extends Configured implements Tool,
+    Reducer<Text, Inlinks, Text, Inlinks> {
+  private static final Logger LOG = LoggerFactory.getLogger(LinkDbMerger.class);
+
+  private int maxInlinks;
+
+  public LinkDbMerger() {
+
+  }
+
+  public LinkDbMerger(Configuration conf) {
+    setConf(conf);
+  }
+
+  public void reduce(Text key, Iterator<Inlinks> values,
+      OutputCollector<Text, Inlinks> output, Reporter reporter)
+      throws IOException {
+
+    Inlinks result = new Inlinks();
+
+    while (values.hasNext()) {
+      Inlinks inlinks = values.next();
+
+      int end = Math.min(maxInlinks - result.size(), inlinks.size());
+      Iterator<Inlink> it = inlinks.iterator();
+      int i = 0;
+      while (it.hasNext() && i++ < end) {
+        result.add(it.next());
+      }
+    }
+    if (result.size() == 0)
+      return;
+    output.collect(key, result);
+
+  }
+
+  public void configure(JobConf job) {
+    maxInlinks = job.getInt("linkdb.max.inlinks", 10000);
+  }
+
+  public void close() throws IOException {
+  }
+
+  public void merge(Path output, Path[] dbs, boolean normalize, boolean filter)
+      throws Exception {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("LinkDb merge: starting at " + sdf.format(start));
+
+    JobConf job = createMergeJob(getConf(), output, normalize, filter);
+    for (int i = 0; i < dbs.length; i++) {
+      FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME));
+    }
+    JobClient.runJob(job);
+    FileSystem fs = FileSystem.get(getConf());
+    fs.mkdirs(output);
+    fs.rename(FileOutputFormat.getOutputPath(job), new Path(output,
+        LinkDb.CURRENT_NAME));
+
+    long end = System.currentTimeMillis();
+    LOG.info("LinkDb merge: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static JobConf createMergeJob(Configuration config, Path linkDb,
+      boolean normalize, boolean filter) {
+    Path newLinkDb = new Path("linkdb-merge-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf job = new NutchJob(config);
+    job.setJobName("linkdb merge " + linkDb);
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(LinkDbFilter.class);
+    job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
+    job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
+    job.setReducerClass(LinkDbMerger.class);
+
+    FileOutputFormat.setOutputPath(job, newLinkDb);
+    job.setOutputFormat(MapFileOutputFormat.class);
+    job.setBoolean("mapred.output.compress", true);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Inlinks.class);
+
+    // https://issues.apache.org/jira/browse/NUTCH-1069
+    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+    return job;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbMerger(),
+        args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("Usage: LinkDbMerger <output_linkdb> <linkdb1> [<linkdb2> <linkdb3> ...] [-normalize] [-filter]");
+      System.err.println("\toutput_linkdb\toutput LinkDb");
+      System.err
+          .println("\tlinkdb1 ...\tinput LinkDb-s (single input LinkDb is ok)");
+      System.err
+          .println("\t-normalize\tuse URLNormalizer on both fromUrls and toUrls in linkdb(s) (usually not needed)");
+      System.err
+          .println("\t-filter\tuse URLFilters on both fromUrls and toUrls in linkdb(s)");
+      return -1;
+    }
+    Path output = new Path(args[0]);
+    ArrayList<Path> dbs = new ArrayList<Path>();
+    boolean normalize = false;
+    boolean filter = false;
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-filter")) {
+        filter = true;
+      } else if (args[i].equals("-normalize")) {
+        normalize = true;
+      } else
+        dbs.add(new Path(args[i]));
+    }
+    try {
+      merge(output, dbs.toArray(new Path[dbs.size()]), normalize, filter);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("LinkDbMerger: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java
new file mode 100644
index 0000000..2e50e9a
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.io.Closeable;
+
+/** . */
+public class LinkDbReader extends Configured implements Tool, Closeable {
+  public static final Logger LOG = LoggerFactory.getLogger(LinkDbReader.class);
+
+  private static final Partitioner<WritableComparable, Writable> PARTITIONER = new HashPartitioner<WritableComparable, Writable>();
+
+  private FileSystem fs;
+  private Path directory;
+  private MapFile.Reader[] readers;
+
+  public LinkDbReader() {
+
+  }
+
+  public LinkDbReader(Configuration conf, Path directory) throws Exception {
+    setConf(conf);
+    init(directory);
+  }
+
+  public void init(Path directory) throws Exception {
+    this.fs = FileSystem.get(getConf());
+    this.directory = directory;
+  }
+
+  public String[] getAnchors(Text url) throws IOException {
+    Inlinks inlinks = getInlinks(url);
+    if (inlinks == null)
+      return null;
+    return inlinks.getAnchors();
+  }
+
+  public Inlinks getInlinks(Text url) throws IOException {
+
+    if (readers == null) {
+      synchronized (this) {
+        readers = MapFileOutputFormat.getReaders(fs, new Path(directory,
+            LinkDb.CURRENT_NAME), getConf());
+      }
+    }
+
+    return (Inlinks) MapFileOutputFormat.getEntry(readers, PARTITIONER, url,
+        new Inlinks());
+  }
+
+  public void close() throws IOException {
+    if (readers != null) {
+      for (int i = 0; i < readers.length; i++) {
+        readers[i].close();
+      }
+    }
+  }
+  
+  public static class LinkDBDumpMapper implements Mapper<Text, Inlinks, Text, Inlinks> {
+    Pattern pattern = null;
+    Matcher matcher = null;
+    
+    public void configure(JobConf job) {
+      if (job.get("linkdb.regex", null) != null) {
+        pattern = Pattern.compile(job.get("linkdb.regex"));
+      }
+    }
+
+    public void close() {}
+    public void map(Text key, Inlinks value, OutputCollector<Text, Inlinks> output, Reporter reporter)
+            throws IOException {
+
+      if (pattern != null) {
+        matcher = pattern.matcher(key.toString());
+        if (!matcher.matches()) {
+          return;
+        }
+      }
+
+      output.collect(key, value);
+    }
+  }
+
+  public void processDumpJob(String linkdb, String output, String regex) throws IOException {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("LinkDb dump: starting at " + sdf.format(start));
+      LOG.info("LinkDb dump: db: " + linkdb);
+    }
+    Path outFolder = new Path(output);
+
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("read " + linkdb);
+    
+    if (regex != null) {
+      job.set("linkdb.regex", regex);
+      job.setMapperClass(LinkDBDumpMapper.class);
+    }
+
+    FileInputFormat.addInputPath(job, new Path(linkdb, LinkDb.CURRENT_NAME));
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    FileOutputFormat.setOutputPath(job, outFolder);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Inlinks.class);
+
+    JobClient.runJob(job);
+
+    long end = System.currentTimeMillis();
+    LOG.info("LinkDb dump: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbReader(),
+        args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("Usage: LinkDbReader <linkdb> (-dump <out_dir> [-regex <regex>]) | -url <url>");
+      System.err
+          .println("\t-dump <out_dir>\tdump whole link db to a text file in <out_dir>");
+      System.err
+          .println("\t\t-regex <regex>\trestrict to url's matching expression");
+      System.err
+          .println("\t-url <url>\tprint information about <url> to System.out");
+      return -1;
+    }
+    try {
+      if (args[1].equals("-dump")) {
+        String regex = null;
+        for (int i = 2; i < args.length; i++) {
+          if (args[i].equals("-regex")) {
+            regex = args[++i];
+          }
+        }
+        processDumpJob(args[0], args[2], regex);
+        return 0;
+      } else if (args[1].equals("-url")) {
+        init(new Path(args[0]));
+        Inlinks links = getInlinks(new Text(args[2]));
+        if (links == null) {
+          System.out.println(" - no link information.");
+        } else {
+          Iterator<Inlink> it = links.iterator();
+          while (it.hasNext()) {
+            System.out.println(it.next().toString());
+          }
+        }
+        return 0;
+      } else {
+        System.err.println("Error: wrong argument " + args[1]);
+        return -1;
+      }
+    } catch (Exception e) {
+      LOG.error("LinkDbReader: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java
new file mode 100644
index 0000000..f6ec8dd
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Default implementation of a page signature. It calculates an MD5 hash of the
+ * raw binary content of a page. In case there is no content, it calculates a
+ * hash from the page's URL.
+ * 
+ * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
+ */
+public class MD5Signature extends Signature {
+
+  public byte[] calculate(Content content, Parse parse) {
+    byte[] data = content.getContent();
+    if (data == null)
+      data = content.getUrl().getBytes();
+    return MD5Hash.digest(data).getDigest();
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java b/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
new file mode 100644
index 0000000..4fe5cef
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.HashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.HttpHeaders;
+import org.apache.nutch.util.MimeUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of @see AdaptiveFetchSchedule that allows for more flexible
+ * configuration of DEC and INC factors for various MIME-types.
+ * 
+ * This class can be typically used in cases where a recrawl consists of many
+ * different MIME-types. It's not very common for MIME-types other than
+ * text/html to change frequently. Using this class you can configure different
+ * factors per MIME-type so to prefer frequently changing MIME-types over
+ * others.
+ * 
+ * For it to work this class relies on the Content-Type MetaData key being
+ * present in the CrawlDB. This can either be done when injecting new URL's or
+ * by adding "Content-Type" to the db.parsemeta.to.crawldb configuration setting
+ * to force MIME-types of newly discovered URL's to be added to the CrawlDB.
+ * 
+ * @author markus
+ */
+public class MimeAdaptiveFetchSchedule extends AdaptiveFetchSchedule {
+  // Loggg
+  public static final Logger LOG = LoggerFactory
+      .getLogger(MimeAdaptiveFetchSchedule.class);
+
+  // Conf directives
+  public static final String SCHEDULE_INC_RATE = "db.fetch.schedule.adaptive.inc_rate";
+  public static final String SCHEDULE_DEC_RATE = "db.fetch.schedule.adaptive.dec_rate";
+  public static final String SCHEDULE_MIME_FILE = "db.fetch.schedule.mime.file";
+
+  // Default values for DEC and INC rate
+  private float defaultIncRate;
+  private float defaultDecRate;
+
+  // Structure to store inc and dec rates per MIME-type
+  private class AdaptiveRate {
+    public float inc;
+    public float dec;
+
+    public AdaptiveRate(Float inc, Float dec) {
+      this.inc = inc;
+      this.dec = dec;
+    }
+  }
+
+  // Here we store the mime's and their delta's
+  private HashMap<String, AdaptiveRate> mimeMap;
+
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf == null)
+      return;
+
+    // Read and set the default INC and DEC rates in case we cannot set values
+    // based on MIME-type
+    defaultIncRate = conf.getFloat(SCHEDULE_INC_RATE, 0.2f);
+    defaultDecRate = conf.getFloat(SCHEDULE_DEC_RATE, 0.2f);
+
+    // Where's the mime/factor file?
+    Reader mimeFile = conf.getConfResourceAsReader(conf.get(SCHEDULE_MIME_FILE,
+        "adaptive-mimetypes.txt"));
+
+    try {
+      readMimeFile(mimeFile);
+    } catch (IOException e) {
+      LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+    }
+  }
+
+  @Override
+  public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
+      long prevFetchTime, long prevModifiedTime, long fetchTime,
+      long modifiedTime, int state) {
+
+    // Set defaults
+    INC_RATE = defaultIncRate;
+    DEC_RATE = defaultDecRate;
+
+    // Check if the Content-Type field is available in the CrawlDatum
+    if (datum.getMetaData().containsKey(HttpHeaders.WRITABLE_CONTENT_TYPE)) {
+      // Get the MIME-type of the current URL
+      String currentMime = MimeUtil.cleanMimeType(datum.getMetaData()
+          .get(HttpHeaders.WRITABLE_CONTENT_TYPE).toString());
+
+      // Check if this MIME-type exists in our map
+      if (mimeMap.containsKey(currentMime)) {
+        // Yes, set the INC and DEC rates for this MIME-type
+        INC_RATE = mimeMap.get(currentMime).inc;
+        DEC_RATE = mimeMap.get(currentMime).dec;
+      }
+    }
+
+    return super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime,
+        fetchTime, modifiedTime, state);
+  }
+
+  /**
+   * Reads the mime types and their associated INC/DEC factors in a HashMap
+   * 
+   * @param mimeFile
+   *          Reader
+   * @return void
+   */
+  private void readMimeFile(Reader mimeFile) throws IOException {
+    // Instance of our mime/factor map
+    mimeMap = new HashMap<String, AdaptiveRate>();
+
+    // Open a reader
+    BufferedReader reader = new BufferedReader(mimeFile);
+
+    String line = null;
+    String[] splits = null;
+
+    // Read all lines
+    while ((line = reader.readLine()) != null) {
+      // Skip blank lines and comments
+      if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+        // Split the line by TAB
+        splits = line.split("\t");
+
+        // Sanity check, we need two or three items
+        if (splits.length == 3) {
+          // Add a lower cased MIME-type and the factor to the map
+          mimeMap.put(StringUtils.lowerCase(splits[0]), new AdaptiveRate(
+              new Float(splits[1]), new Float(splits[2])));
+        } else {
+          LOG.warn("Invalid configuration line in: " + line);
+        }
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    FetchSchedule fs = new MimeAdaptiveFetchSchedule();
+    fs.setConf(NutchConfiguration.create());
+    // we start the time at 0, for simplicity
+    long curTime = 0;
+    long delta = 1000L * 3600L * 24L; // 2 hours
+    // we trigger the update of the page every 30 days
+    long update = 1000L * 3600L * 24L * 30L; // 30 days
+    boolean changed = true;
+    long lastModified = 0;
+    int miss = 0;
+    int totalMiss = 0;
+    int maxMiss = 0;
+    int fetchCnt = 0;
+    int changeCnt = 0;
+
+    // initial fetchInterval is 10 days
+    CrawlDatum p = new CrawlDatum(1, 3600 * 24 * 30, 1.0f);
+
+    // Set a default MIME-type to test with
+    org.apache.hadoop.io.MapWritable x = new org.apache.hadoop.io.MapWritable();
+    x.put(HttpHeaders.WRITABLE_CONTENT_TYPE, new Text(
+        "text/html; charset=utf-8"));
+    p.setMetaData(x);
+
+    p.setFetchTime(0);
+    LOG.info(p.toString());
+
+    // let's move the timeline a couple of deltas
+    for (int i = 0; i < 10000; i++) {
+      if (lastModified + update < curTime) {
+        // System.out.println("i=" + i + ", lastModified=" + lastModified +
+        // ", update=" + update + ", curTime=" + curTime);
+        changed = true;
+        changeCnt++;
+        lastModified = curTime;
+      }
+
+      LOG.info(i + ". " + changed + "\twill fetch at "
+          + (p.getFetchTime() / delta) + "\tinterval "
+          + (p.getFetchInterval() / SECONDS_PER_DAY) + " days" + "\t missed "
+          + miss);
+
+      if (p.getFetchTime() <= curTime) {
+        fetchCnt++;
+        fs.setFetchSchedule(new Text("http://www.example.com"), p, p
+            .getFetchTime(), p.getModifiedTime(), curTime, lastModified,
+            changed ? FetchSchedule.STATUS_MODIFIED
+                : FetchSchedule.STATUS_NOTMODIFIED);
+
+        LOG.info("\tfetched & adjusted: " + "\twill fetch at "
+            + (p.getFetchTime() / delta) + "\tinterval "
+            + (p.getFetchInterval() / SECONDS_PER_DAY) + " days");
+
+        if (!changed)
+          miss++;
+        if (miss > maxMiss)
+          maxMiss = miss;
+        changed = false;
+        totalMiss += miss;
+        miss = 0;
+      }
+
+      if (changed)
+        miss++;
+      curTime += delta;
+    }
+    LOG.info("Total missed: " + totalMiss + ", max miss: " + maxMiss);
+    LOG.info("Page changed " + changeCnt + " times, fetched " + fetchCnt
+        + " times.");
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java b/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java
new file mode 100644
index 0000000..589b8b9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.util.GenericWritableConfigurable;
+
+@SuppressWarnings("unchecked")
+public class NutchWritable extends GenericWritableConfigurable {
+
+  private static Class<? extends Writable>[] CLASSES = null;
+
+  static {
+    CLASSES = (Class<? extends Writable>[]) new Class<?>[] {
+        org.apache.hadoop.io.NullWritable.class,
+        org.apache.hadoop.io.BooleanWritable.class,
+        org.apache.hadoop.io.LongWritable.class,
+        org.apache.hadoop.io.ByteWritable.class,
+        org.apache.hadoop.io.BytesWritable.class,
+        org.apache.hadoop.io.FloatWritable.class,
+        org.apache.hadoop.io.IntWritable.class,
+        org.apache.hadoop.io.MapWritable.class,
+        org.apache.hadoop.io.Text.class, org.apache.hadoop.io.MD5Hash.class,
+        org.apache.nutch.crawl.CrawlDatum.class,
+        org.apache.nutch.crawl.Inlink.class,
+        org.apache.nutch.crawl.Inlinks.class,
+        org.apache.nutch.indexer.NutchIndexAction.class,
+        org.apache.nutch.metadata.Metadata.class,
+        org.apache.nutch.parse.Outlink.class,
+        org.apache.nutch.parse.ParseText.class,
+        org.apache.nutch.parse.ParseData.class,
+        org.apache.nutch.parse.ParseImpl.class,
+        org.apache.nutch.parse.ParseStatus.class,
+        org.apache.nutch.protocol.Content.class,
+        org.apache.nutch.protocol.ProtocolStatus.class,
+        org.apache.nutch.scoring.webgraph.LinkDatum.class,
+        org.apache.nutch.hostdb.HostDatum.class };
+  }
+
+  public NutchWritable() {
+  }
+
+  public NutchWritable(Writable instance) {
+    set(instance);
+  }
+
+  @Override
+  protected Class<? extends Writable>[] getTypes() {
+    return CLASSES;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java
new file mode 100644
index 0000000..21dfe07
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.protocol.Content;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+
+public abstract class Signature implements Configurable {
+  protected Configuration conf;
+
+  public abstract byte[] calculate(Content content, Parse parse);
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java
new file mode 100644
index 0000000..d217d93
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.util.Comparator;
+
+public class SignatureComparator implements Comparator<Object> {
+  public int compare(Object o1, Object o2) {
+    return _compare(o1, o2);
+  }
+
+  public static int _compare(Object o1, Object o2) {
+    if (o1 == null && o2 == null)
+      return 0;
+    if (o1 == null)
+      return -1;
+    if (o2 == null)
+      return 1;
+    if (!(o1 instanceof byte[]))
+      return -1;
+    if (!(o2 instanceof byte[]))
+      return 1;
+    byte[] data1 = (byte[]) o1;
+    byte[] data2 = (byte[]) o2;
+    return _compare(data1, 0, data1.length, data2, 0, data2.length);
+  }
+
+  public static int _compare(byte[] data1, int s1, int l1, byte[] data2,
+      int s2, int l2) {
+    if (l2 > l1)
+      return -1;
+    if (l2 < l1)
+      return 1;
+    int res = 0;
+    for (int i = 0; i < l1; i++) {
+      res = (data1[s1 + i] - data2[s2 + i]);
+      if (res != 0)
+        return res;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java
new file mode 100644
index 0000000..16d8cc0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Hadoop imports
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.ObjectCache;
+
+/**
+ * Factory class, which instantiates a Signature implementation according to the
+ * current Configuration configuration. This newly created instance is cached in
+ * the Configuration instance, so that it could be later retrieved.
+ * 
+ * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
+ */
+public class SignatureFactory {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SignatureFactory.class);
+
+  private SignatureFactory() {
+  } // no public ctor
+
+  /** Return the default Signature implementation. */
+  public synchronized static Signature getSignature(Configuration conf) {
+    String clazz = conf.get("db.signature.class", MD5Signature.class.getName());
+    ObjectCache objectCache = ObjectCache.get(conf);
+    Signature impl = (Signature) objectCache.getObject(clazz);
+    if (impl == null) {
+      try {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Using Signature impl: " + clazz);
+        }
+        Class<?> implClass = Class.forName(clazz);
+        impl = (Signature) implClass.newInstance();
+        impl.setConf(conf);
+        objectCache.setObject(clazz, impl);
+      } catch (Exception e) {
+        throw new RuntimeException("Couldn't create " + clazz, e);
+      }
+    }
+    return impl;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java
new file mode 100644
index 0000000..b88cfa6
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Implementation of a page signature. It calculates an MD5 hash of the textual
+ * content of a page. In case there is no content, it calculates a hash from the
+ * page's URL.
+ */
+public class TextMD5Signature extends Signature {
+
+  Signature fallback = new MD5Signature();
+
+  public byte[] calculate(Content content, Parse parse) {
+    String text = parse.getText();
+
+    if (text == null || text.length() == 0) {
+      return fallback.calculate(content, parse);
+    }
+
+    return MD5Hash.digest(text).getDigest();
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java
new file mode 100644
index 0000000..5d930f9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * <p>
+ * An implementation of a page signature. It calculates an MD5 hash of a plain
+ * text "profile" of a page. In case there is no text, it calculates a hash
+ * using the {@link MD5Signature}.
+ * </p>
+ * <p>
+ * The algorithm to calculate a page "profile" takes the plain text version of a
+ * page and performs the following steps:
+ * <ul>
+ * <li>remove all characters except letters and digits, and bring all characters
+ * to lower case,</li>
+ * <li>split the text into tokens (all consecutive non-whitespace characters),</li>
+ * <li>discard tokens equal or shorter than MIN_TOKEN_LEN (default 2
+ * characters),</li>
+ * <li>sort the list of tokens by decreasing frequency,</li>
+ * <li>round down the counts of tokens to the nearest multiple of QUANT (
+ * <code>QUANT = QUANT_RATE * maxFreq</code>, where <code>QUANT_RATE</code> is
+ * 0.01f by default, and <code>maxFreq</code> is the maximum token frequency).
+ * If <code>maxFreq</code> is higher than 1, then QUANT is always higher than 2
+ * (which means that tokens with frequency 1 are always discarded).</li>
+ * <li>tokens, which frequency after quantization falls below QUANT, are
+ * discarded.</li>
+ * <li>create a list of tokens and their quantized frequency, separated by
+ * spaces, in the order of decreasing frequency.</li>
+ * </ul>
+ * This list is then submitted to an MD5 hash calculation.
+ * 
+ * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
+ */
+public class TextProfileSignature extends Signature {
+
+  Signature fallback = new MD5Signature();
+
+  public byte[] calculate(Content content, Parse parse) {
+    int MIN_TOKEN_LEN = getConf().getInt(
+        "db.signature.text_profile.min_token_len", 2);
+    float QUANT_RATE = getConf().getFloat(
+        "db.signature.text_profile.quant_rate", 0.01f);
+    HashMap<String, Token> tokens = new HashMap<String, Token>();
+    String text = null;
+    if (parse != null)
+      text = parse.getText();
+    if (text == null || text.length() == 0)
+      return fallback.calculate(content, parse);
+    StringBuffer curToken = new StringBuffer();
+    int maxFreq = 0;
+    for (int i = 0; i < text.length(); i++) {
+      char c = text.charAt(i);
+      if (Character.isLetterOrDigit(c)) {
+        curToken.append(Character.toLowerCase(c));
+      } else {
+        if (curToken.length() > 0) {
+          if (curToken.length() > MIN_TOKEN_LEN) {
+            // add it
+            String s = curToken.toString();
+            Token tok = tokens.get(s);
+            if (tok == null) {
+              tok = new Token(0, s);
+              tokens.put(s, tok);
+            }
+            tok.cnt++;
+            if (tok.cnt > maxFreq)
+              maxFreq = tok.cnt;
+          }
+          curToken.setLength(0);
+        }
+      }
+    }
+    // check the last token
+    if (curToken.length() > MIN_TOKEN_LEN) {
+      // add it
+      String s = curToken.toString();
+      Token tok = tokens.get(s);
+      if (tok == null) {
+        tok = new Token(0, s);
+        tokens.put(s, tok);
+      }
+      tok.cnt++;
+      if (tok.cnt > maxFreq)
+        maxFreq = tok.cnt;
+    }
+    Iterator<Token> it = tokens.values().iterator();
+    ArrayList<Token> profile = new ArrayList<Token>();
+    // calculate the QUANT value
+    int QUANT = Math.round(maxFreq * QUANT_RATE);
+    if (QUANT < 2) {
+      if (maxFreq > 1)
+        QUANT = 2;
+      else
+        QUANT = 1;
+    }
+    while (it.hasNext()) {
+      Token t = it.next();
+      // round down to the nearest QUANT
+      t.cnt = (t.cnt / QUANT) * QUANT;
+      // discard the frequencies below the QUANT
+      if (t.cnt < QUANT) {
+        continue;
+      }
+      profile.add(t);
+    }
+    Collections.sort(profile, new TokenComparator());
+    StringBuffer newText = new StringBuffer();
+    it = profile.iterator();
+    while (it.hasNext()) {
+      Token t = it.next();
+      if (newText.length() > 0)
+        newText.append("\n");
+      newText.append(t.toString());
+    }
+    return MD5Hash.digest(newText.toString()).getDigest();
+  }
+
+  private static class Token {
+    public int cnt;
+    public String val;
+
+    public Token(int cnt, String val) {
+      this.cnt = cnt;
+      this.val = val;
+    }
+
+    public String toString() {
+      return val + " " + cnt;
+    }
+  }
+
+  private static class TokenComparator implements Comparator<Token> {
+    public int compare(Token t1, Token t2) {
+      return t2.cnt - t1.cnt;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TextProfileSignature sig = new TextProfileSignature();
+    sig.setConf(NutchConfiguration.create());
+    HashMap<String, byte[]> res = new HashMap<String, byte[]>();
+    File[] files = new File(args[0]).listFiles();
+    for (int i = 0; i < files.length; i++) {
+      FileInputStream fis = new FileInputStream(files[i]);
+      BufferedReader br = new BufferedReader(
+          new InputStreamReader(fis, "UTF-8"));
+      StringBuffer text = new StringBuffer();
+      String line = null;
+      while ((line = br.readLine()) != null) {
+        if (text.length() > 0)
+          text.append("\n");
+        text.append(line);
+      }
+      br.close();
+      byte[] signature = sig.calculate(null, new ParseImpl(text.toString(),
+          null));
+      res.put(files[i].toString(), signature);
+    }
+    Iterator<String> it = res.keySet().iterator();
+    while (it.hasNext()) {
+      String name = it.next();
+      byte[] signature = res.get(name);
+      System.out.println(name + "\t" + StringUtil.toHexString(signature));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java b/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java
new file mode 100644
index 0000000..4675f83
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Partition urls by host, domain name or IP depending on the value of the
+ * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
+ */
+public class URLPartitioner implements Partitioner<Text, Writable> {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(URLPartitioner.class);
+
+  public static final String PARTITION_MODE_KEY = "partition.url.mode";
+
+  public static final String PARTITION_MODE_HOST = "byHost";
+  public static final String PARTITION_MODE_DOMAIN = "byDomain";
+  public static final String PARTITION_MODE_IP = "byIP";
+
+  private int seed;
+  private URLNormalizers normalizers;
+  private String mode = PARTITION_MODE_HOST;
+
+  public void configure(JobConf job) {
+    seed = job.getInt("partition.url.seed", 0);
+    mode = job.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST);
+    // check that the mode is known
+    if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN)
+        && !mode.equals(PARTITION_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + mode + " - forcing to byHost");
+      mode = PARTITION_MODE_HOST;
+    }
+    normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_PARTITION);
+  }
+
+  public void close() {
+  }
+
+  /** Hash by domain name. */
+  public int getPartition(Text key, Writable value, int numReduceTasks) {
+    String urlString = key.toString();
+    URL url = null;
+    int hashCode = urlString.hashCode();
+    try {
+      urlString = normalizers.normalize(urlString,
+          URLNormalizers.SCOPE_PARTITION);
+      url = new URL(urlString);
+      hashCode = url.getHost().hashCode();
+    } catch (MalformedURLException e) {
+      LOG.warn("Malformed URL: '" + urlString + "'");
+    }
+
+    if (mode.equals(PARTITION_MODE_DOMAIN) && url != null)
+      hashCode = URLUtil.getDomainName(url).hashCode();
+    else if (mode.equals(PARTITION_MODE_IP)) {
+      try {
+        InetAddress address = InetAddress.getByName(url.getHost());
+        hashCode = address.getHostAddress().hashCode();
+      } catch (UnknownHostException e) {
+        Generator.LOG.info("Couldn't find IP for host: " + url.getHost());
+      }
+    }
+
+    // make hosts wind up in different partitions on different runs
+    hashCode ^= seed;
+
+    return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/package.html
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/package.html b/nutch-core/src/main/java/org/apache/nutch/crawl/package.html
new file mode 100644
index 0000000..05eeb50
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/package.html
@@ -0,0 +1,5 @@
+<html>
+<body>
+Crawl control code and tools to run the crawler.
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java
new file mode 100644
index 0000000..3ad4970
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This class describes the item to be fetched.
+ */
+public class FetchItem {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FetchItem.class);
+
+  int outlinkDepth = 0;
+  String queueID;
+  Text url;
+  URL u;
+  CrawlDatum datum;
+
+  public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
+    this(url, u, datum, queueID, 0);
+  }
+
+  public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
+      int outlinkDepth) {
+    this.url = url;
+    this.u = u;
+    this.datum = datum;
+    this.queueID = queueID;
+    this.outlinkDepth = outlinkDepth;
+  }
+
+  /**
+   * Create an item. Queue id will be created based on <code>queueMode</code>
+   * argument, either as a protocol + hostname pair, protocol + IP address
+   * pair or protocol+domain pair.
+   */
+  public static FetchItem create(Text url, CrawlDatum datum, String queueMode) {
+    return create(url, datum, queueMode, 0);
+  }
+
+  public static FetchItem create(Text url, CrawlDatum datum,
+      String queueMode, int outlinkDepth) {
+    String queueID;
+    URL u = null;
+    try {
+      u = new URL(url.toString());
+    } catch (Exception e) {
+      LOG.warn("Cannot parse url: " + url, e);
+      return null;
+    }
+    final String proto = u.getProtocol().toLowerCase();
+    String key;
+    if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
+      try {
+        final InetAddress addr = InetAddress.getByName(u.getHost());
+        key = addr.getHostAddress();
+      } catch (final UnknownHostException e) {
+        // unable to resolve it, so don't fall back to host name
+        LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+        return null;
+      }
+    } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
+      key = URLUtil.getDomainName(u);
+      if (key == null) {
+        LOG.warn("Unknown domain for url: " + url
+            + ", using URL string as key");
+        key = u.toExternalForm();
+      }
+    } else {
+      key = u.getHost();
+      if (key == null) {
+        LOG.warn("Unknown host for url: " + url + ", using URL string as key");
+        key = u.toExternalForm();
+      }
+    }
+    queueID = proto + "://" + key.toLowerCase();
+    return new FetchItem(url, u, datum, queueID, outlinkDepth);
+  }
+
+  public CrawlDatum getDatum() {
+    return datum;
+  }
+
+  public String getQueueID() {
+    return queueID;
+  }
+
+  public Text getUrl() {
+    return url;
+  }
+
+  public URL getURL2() {
+    return u;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java
new file mode 100644
index 0000000..182c063
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles FetchItems which come from the same host ID (be it a
+ * proto/hostname or proto/IP pair). It also keeps track of requests in
+ * progress and elapsed time between requests.
+ */
+public class FetchItemQueue {
+  
+  private static final Logger LOG = LoggerFactory.getLogger(FetchItemQueues.class);
+
+  List<FetchItem> queue = Collections
+      .synchronizedList(new LinkedList<FetchItem>());
+  AtomicInteger inProgress = new AtomicInteger();
+  AtomicLong nextFetchTime = new AtomicLong();
+  AtomicInteger exceptionCounter = new AtomicInteger();
+  long crawlDelay;
+  long minCrawlDelay;
+  int maxThreads;
+  Configuration conf;
+
+  public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
+      long minCrawlDelay) {
+    this.conf = conf;
+    this.maxThreads = maxThreads;
+    this.crawlDelay = crawlDelay;
+    this.minCrawlDelay = minCrawlDelay;
+    // ready to start
+    setEndTime(System.currentTimeMillis() - crawlDelay);
+  }
+
+  public synchronized int emptyQueue() {
+    int presize = queue.size();
+    queue.clear();
+    return presize;
+  }
+
+  public int getQueueSize() {
+    return queue.size();
+  }
+
+  public int getInProgressSize() {
+    return inProgress.get();
+  }
+
+  public int incrementExceptionCounter() {
+    return exceptionCounter.incrementAndGet();
+  }
+
+  public void finishFetchItem(FetchItem it, boolean asap) {
+    if (it != null) {
+      inProgress.decrementAndGet();
+      setEndTime(System.currentTimeMillis(), asap);
+    }
+  }
+
+  public void addFetchItem(FetchItem it) {
+    if (it == null)
+      return;
+    queue.add(it);
+  }
+
+  public void addInProgressFetchItem(FetchItem it) {
+    if (it == null)
+      return;
+    inProgress.incrementAndGet();
+  }
+
+  public FetchItem getFetchItem() {
+    if (inProgress.get() >= maxThreads)
+      return null;
+    long now = System.currentTimeMillis();
+    if (nextFetchTime.get() > now)
+      return null;
+    FetchItem it = null;
+    if (queue.size() == 0)
+      return null;
+    try {
+      it = queue.remove(0);
+      inProgress.incrementAndGet();
+    } catch (Exception e) {
+      LOG.error(
+          "Cannot remove FetchItem from queue or cannot add it to inProgress queue",
+          e);
+    }
+    return it;
+  }
+
+  public synchronized void dump() {
+    LOG.info("  maxThreads    = " + maxThreads);
+    LOG.info("  inProgress    = " + inProgress.get());
+    LOG.info("  crawlDelay    = " + crawlDelay);
+    LOG.info("  minCrawlDelay = " + minCrawlDelay);
+    LOG.info("  nextFetchTime = " + nextFetchTime.get());
+    LOG.info("  now           = " + System.currentTimeMillis());
+    for (int i = 0; i < queue.size(); i++) {
+      FetchItem it = queue.get(i);
+      LOG.info("  " + i + ". " + it.url);
+    }
+  }
+
+  private void setEndTime(long endTime) {
+    setEndTime(endTime, false);
+  }
+
+  private void setEndTime(long endTime, boolean asap) {
+    if (!asap)
+      nextFetchTime.set(endTime
+          + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+    else
+      nextFetchTime.set(endTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java
new file mode 100644
index 0000000..4473ff0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convenience class - a collection of queues that keeps track of the total
+ * number of items, and provides items eligible for fetching from any queue.
+ */
+public class FetchItemQueues {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FetchItemQueues.class);
+  
+  public static final String DEFAULT_ID = "default";
+  Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+  AtomicInteger totalSize = new AtomicInteger(0);
+  int maxThreads;
+  long crawlDelay;
+  long minCrawlDelay;
+  long timelimit = -1;
+  int maxExceptionsPerQueue = -1;
+  Configuration conf;
+
+  public static final String QUEUE_MODE_HOST = "byHost";
+  public static final String QUEUE_MODE_DOMAIN = "byDomain";
+  public static final String QUEUE_MODE_IP = "byIP";
+
+  String queueMode;
+
+  public FetchItemQueues(Configuration conf) {
+    this.conf = conf;
+    this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
+    queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
+    // check that the mode is known
+    if (!queueMode.equals(QUEUE_MODE_IP)
+        && !queueMode.equals(QUEUE_MODE_DOMAIN)
+        && !queueMode.equals(QUEUE_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + queueMode
+          + " - forcing to byHost");
+      queueMode = QUEUE_MODE_HOST;
+    }
+    LOG.info("Using queue mode : " + queueMode);
+
+    this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
+    this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
+        0.0f) * 1000);
+    this.timelimit = conf.getLong("fetcher.timelimit", -1);
+    this.maxExceptionsPerQueue = conf.getInt(
+        "fetcher.max.exceptions.per.queue", -1);
+  }
+
+  public int getTotalSize() {
+    return totalSize.get();
+  }
+
+  public int getQueueCount() {
+    return queues.size();
+  }
+
+  public void addFetchItem(Text url, CrawlDatum datum) {
+    FetchItem it = FetchItem.create(url, datum, queueMode);
+    if (it != null)
+      addFetchItem(it);
+  }
+
+  public synchronized void addFetchItem(FetchItem it) {
+    FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+    fiq.addFetchItem(it);
+    totalSize.incrementAndGet();
+  }
+
+  public void finishFetchItem(FetchItem it) {
+    finishFetchItem(it, false);
+  }
+
+  public void finishFetchItem(FetchItem it, boolean asap) {
+    FetchItemQueue fiq = queues.get(it.queueID);
+    if (fiq == null) {
+      LOG.warn("Attempting to finish item from unknown queue: " + it);
+      return;
+    }
+    fiq.finishFetchItem(it, asap);
+  }
+
+  public synchronized FetchItemQueue getFetchItemQueue(String id) {
+    FetchItemQueue fiq = queues.get(id);
+    if (fiq == null) {
+      // initialize queue
+      fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+      queues.put(id, fiq);
+    }
+    return fiq;
+  }
+
+  public synchronized FetchItem getFetchItem() {
+    Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
+        .iterator();
+    while (it.hasNext()) {
+      FetchItemQueue fiq = it.next().getValue();
+      // reap empty queues
+      if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+        it.remove();
+        continue;
+      }
+      FetchItem fit = fiq.getFetchItem();
+      if (fit != null) {
+        totalSize.decrementAndGet();
+        return fit;
+      }
+    }
+    return null;
+  }
+
+  // called only once the feeder has stopped
+  public synchronized int checkTimelimit() {
+    int count = 0;
+
+    if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+      // emptying the queues
+      count = emptyQueues();
+
+      // there might also be a case where totalsize !=0 but number of queues
+      // == 0
+      // in which case we simply force it to 0 to avoid blocking
+      if (totalSize.get() != 0 && queues.size() == 0)
+        totalSize.set(0);
+    }
+    return count;
+  }
+
+  // empties the queues (used by timebomb and throughput threshold)
+  public synchronized int emptyQueues() {
+    int count = 0;
+
+    for (String id : queues.keySet()) {
+      FetchItemQueue fiq = queues.get(id);
+      if (fiq.getQueueSize() == 0)
+        continue;
+      LOG.info("* queue: " + id + " >> dropping! ");
+      int deleted = fiq.emptyQueue();
+      for (int i = 0; i < deleted; i++) {
+        totalSize.decrementAndGet();
+      }
+      count += deleted;
+    }
+
+    return count;
+  }
+
+  /**
+   * Increment the exception counter of a queue in case of an exception e.g.
+   * timeout; when higher than a given threshold simply empty the queue.
+   * 
+   * @param queueid
+   * @return number of purged items
+   */
+  public synchronized int checkExceptionThreshold(String queueid) {
+    FetchItemQueue fiq = queues.get(queueid);
+    if (fiq == null) {
+      return 0;
+    }
+    if (fiq.getQueueSize() == 0) {
+      return 0;
+    }
+    int excCount = fiq.incrementExceptionCounter();
+    if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
+      // too many exceptions for items in this queue - purge it
+      int deleted = fiq.emptyQueue();
+      LOG.info("* queue: " + queueid + " >> removed " + deleted
+          + " URLs from queue because " + excCount + " exceptions occurred");
+      for (int i = 0; i < deleted; i++) {
+        totalSize.decrementAndGet();
+      }
+      return deleted;
+    }
+    return 0;
+  }
+
+  public synchronized void dump() {
+    for (String id : queues.keySet()) {
+      FetchItemQueue fiq = queues.get(id);
+      if (fiq.getQueueSize() == 0)
+        continue;
+      LOG.info("* queue: " + id);
+      fiq.dump();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java
new file mode 100644
index 0000000..892c90f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.parse.Outlink;
+
+public class FetchNode {
+  private Text url = null;
+  private Outlink[] outlinks;
+  private int status = 0;
+  private String title = null;
+  private long fetchTime = 0;
+  
+  public Text getUrl() {
+    return url;
+  }
+  public void setUrl(Text url) {
+    this.url = url;
+  }
+  public Outlink[] getOutlinks() {
+    return outlinks;
+  }
+  public void setOutlinks(Outlink[] links) {
+    this.outlinks = links;
+  }
+  public int getStatus() {
+    return status;
+  }
+  public void setStatus(int status) {
+    this.status = status;
+  }
+  public String getTitle() {
+    return title;
+  }
+  public void setTitle(String title) {
+    this.title = title;
+  }
+  public long getFetchTime() {
+    return fetchTime;
+  }
+  public void setFetchTime(long fetchTime) {
+    this.fetchTime = fetchTime;
+  }  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java
new file mode 100644
index 0000000..2e69f31
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class FetchNodeDb {
+
+  private Map<Integer, FetchNode> fetchNodeDbMap;
+  private int index;
+  private static FetchNodeDb fetchNodeDbInstance = null;
+  
+  public FetchNodeDb(){    
+    fetchNodeDbMap = new ConcurrentHashMap<Integer, FetchNode>();
+    index = 1;
+  }
+  
+  public static FetchNodeDb getInstance(){
+    
+    if(fetchNodeDbInstance == null){
+      fetchNodeDbInstance = new FetchNodeDb();
+    }
+    return fetchNodeDbInstance;
+  }
+  
+  public void put(String url, FetchNode fetchNode){
+    System.out.println("FetchNodeDb : putting node - " + fetchNode.hashCode());
+    fetchNodeDbMap.put(index++, fetchNode);    
+  }  
+  public Map<Integer, FetchNode> getFetchNodeDb(){
+    return fetchNodeDbMap;
+  }
+}
\ No newline at end of file