You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:49:01 UTC

[45/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java
new file mode 100644
index 0000000..3ba3c81
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values
+ * by host and checks DNS entries for hosts.
+ */
+public class UpdateHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDb.class);
+  public static final String LOCK_NAME = ".locked";
+
+  public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold";
+  public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads";
+  public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval";
+  public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed";
+  public static final String HOSTDB_CHECK_NEW = "hostdb.check.new";
+  public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known";
+  public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check";
+  public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter";
+  public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize";
+  public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields";
+  public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields";
+  public static final String HOSTDB_PERCENTILES = "hostdb.percentiles";
+  
+  private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts,
+    boolean checkFailed, boolean checkNew, boolean checkKnown,
+    boolean force, boolean filter, boolean normalize) throws Exception {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("UpdateHostDb: starting at " + sdf.format(start));
+
+    JobConf job = new NutchJob(getConf());
+    boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+    job.setJarByClass(UpdateHostDb.class);
+    job.setJobName("UpdateHostDb");
+
+    // Check whether the urlfilter-domainblacklist plugin is loaded
+    if (filter && new String("urlfilter-domainblacklist").matches(job.get("plugin.includes"))) {
+      throw new Exception("domainblacklist-urlfilter must not be enabled");
+    }
+
+    // Check whether the urlnormalizer-host plugin is loaded
+    if (normalize && new String("urlnormalizer-host").matches(job.get("plugin.includes"))) {
+      throw new Exception("urlnormalizer-host must not be enabled");
+    }
+
+    FileSystem fs = FileSystem.get(job);
+    Path old = new Path(hostDb, "old");
+    Path current = new Path(hostDb, "current");
+    Path tempHostDb = new Path(hostDb, "hostdb-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    // lock an existing hostdb to prevent multiple simultaneous updates
+    Path lock = new Path(hostDb, LOCK_NAME);
+    if (!fs.exists(current)) {
+      fs.mkdirs(current);
+    }
+    LockUtil.createLockFile(fs, lock, false);
+
+    MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+
+    if (topHosts != null) {
+      MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class);
+    }
+    if (crawlDb != null) {
+      // Tell the job we read from CrawlDB
+      job.setBoolean("hostdb.reading.crawldb", true);
+      MultipleInputs.addInputPath(job, new Path(crawlDb,
+        CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class);
+    }
+
+    FileOutputFormat.setOutputPath(job, tempHostDb);
+
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(HostDatum.class);
+    job.setMapperClass(UpdateHostDbMapper.class);
+    job.setReducerClass(UpdateHostDbReducer.class);
+
+    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    job.setSpeculativeExecution(false);
+    job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed);
+    job.setBoolean(HOSTDB_CHECK_NEW, checkNew);
+    job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown);
+    job.setBoolean(HOSTDB_FORCE_CHECK, force);
+    job.setBoolean(HOSTDB_URL_FILTERING, filter);
+    job.setBoolean(HOSTDB_URL_NORMALIZING, normalize);
+    job.setClassLoader(Thread.currentThread().getContextClassLoader());
+    
+    try {
+      JobClient.runJob(job);
+
+      FSUtils.replace(fs, old, current, true);
+      FSUtils.replace(fs, current, tempHostDb, true);
+
+      if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
+    } catch (Exception e) {
+      if (fs.exists(tempHostDb)) {
+        fs.delete(tempHostDb, true);
+      }
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+
+    LockUtil.removeLockFile(fs, lock);
+    long end = System.currentTimeMillis();
+    LOG.info("UpdateHostDb: finished at " + sdf.format(end) +
+      ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new UpdateHostDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " +
+        "[-tophosts <tophosts>] [-crawldb <crawldb>] [-checkAll] [-checkFailed]" +
+        " [-checkNew] [-checkKnown] [-force] [-filter] [-normalize]");
+      return -1;
+    }
+
+    Path hostDb = null;
+    Path crawlDb = null;
+    Path topHosts = null;
+
+    boolean checkFailed = false;
+    boolean checkNew = false;
+    boolean checkKnown = false;
+    boolean force = false;
+
+    boolean filter = false;
+    boolean normalize = false;
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-hostdb")) {
+        hostDb = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: hostdb: " + hostDb);
+        i++;
+      }
+      if (args[i].equals("-crawldb")) {
+        crawlDb = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: crawldb: " + crawlDb);
+        i++;
+      }
+      if (args[i].equals("-tophosts")) {
+        topHosts = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: tophosts: " + topHosts);
+        i++;
+      }
+
+      if (args[i].equals("-checkFailed")) {
+        LOG.info("UpdateHostDb: checking failed hosts");
+        checkFailed = true;
+      }
+      if (args[i].equals("-checkNew")) {
+        LOG.info("UpdateHostDb: checking new hosts");
+        checkNew = true;
+      }
+      if (args[i].equals("-checkKnown")) {
+        LOG.info("UpdateHostDb: checking known hosts");
+        checkKnown = true;
+      }
+      if (args[i].equals("-checkAll")) {
+        LOG.info("UpdateHostDb: checking all hosts");
+        checkFailed = true;
+        checkNew = true;
+        checkKnown = true;
+      }
+      if (args[i].equals("-force")) {
+        LOG.info("UpdateHostDb: forced check");
+        force = true;
+      }
+      if (args[i].equals("-filter")) {
+        LOG.info("UpdateHostDb: filtering enabled");
+        filter = true;
+      }
+      if (args[i].equals("-normalize")) {
+        LOG.info("UpdateHostDb: normalizing enabled");
+        normalize = true;
+      }
+    }
+
+    if (hostDb == null) {
+      System.err.println("hostDb is mandatory");
+      return -1;
+    }
+
+    try {
+      updateHostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew,
+        checkKnown, force, filter, normalize);
+
+      return 0;
+    } catch (Exception e) {
+      LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
new file mode 100644
index 0000000..5844b04
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.URLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read
+ * host score info from a plain text key/value file generated by the
+ * Webgraph's NodeDumper tool.
+ */
+public class UpdateHostDbMapper
+  implements Mapper<Text, Writable, Text, NutchWritable> {
+  
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbMapper.class);
+  protected Text host = new Text();
+  protected HostDatum hostDatum = null;
+  protected CrawlDatum crawlDatum = null;
+  protected String reprUrl = null;
+  protected String buffer = null;
+  protected String[] args = null;
+  protected boolean filter = false;
+  protected boolean normalize = false;
+  protected boolean readingCrawlDb = false;
+  protected URLFilters filters = null;
+  protected URLNormalizers normalizers = null;
+
+  public void close() {}
+
+  /**
+   * @param JobConf
+   * @return void
+   */
+  public void configure(JobConf job) {
+    readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false);
+    filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false);
+    normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false);
+
+    if (filter)
+      filters = new URLFilters(job);
+    if (normalize)
+      normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT);
+  }
+
+  /**
+   * Filters and or normalizes the input URL
+   *
+   * @param String
+   * @return String
+   */
+  protected String filterNormalize(String url) {
+    // We actually receive a hostname here so let's make a URL
+    // TODO: we force shop.fcgroningen to be https, how do we know that here?
+    // http://issues.openindex.io/browse/SPIDER-40
+    url = "http://" + url + "/";
+
+    try {
+      if (normalize)
+        url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+      if (filter)
+        url = filters.filter(url);
+      if (url == null)
+        return null;
+    } catch (Exception e) {
+      return null;
+    }
+
+    // Turn back to host
+    return URLUtil.getHost(url);
+  }
+
+  /**
+    * Mapper ingesting records from the HostDB, CrawlDB and plaintext host
+    * scores file. Statistics and scores are passed on.
+    *
+    * @param Text key
+    * @param Writable value
+    * @param OutputCollector<Text,NutchWritable> output
+    * @param Reporter reporter
+    * @return void
+    */
+  public void map(Text key, Writable value,
+    OutputCollector<Text,NutchWritable> output, Reporter reporter)
+    throws IOException {
+
+    // Get the key!
+    String keyStr = key.toString();
+
+    // Check if we process records from the CrawlDB
+    if (key instanceof Text && value instanceof CrawlDatum) {
+      // Get the normalized and filtered host of this URL
+      buffer = filterNormalize(URLUtil.getHost(keyStr));
+
+      // Filtered out?
+      if (buffer == null) {
+        reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+        LOG.info("UpdateHostDb: " + URLUtil.getHost(keyStr) + " crawldatum has been filtered");
+        return;
+      }
+
+      // Set the host of this URL
+      host.set(buffer);
+      crawlDatum = (CrawlDatum)value;
+      hostDatum = new HostDatum();
+
+      /**
+        * TODO: fix multi redirects: host_a => host_b/page => host_c/page/whatever
+        * http://www.ferienwohnung-armbruster.de/
+        * http://www.ferienwohnung-armbruster.de/website/
+        * http://www.ferienwohnung-armbruster.de/website/willkommen.php
+        *
+        * We cannot reresolve redirects for host objects as CrawlDatum metadata is
+        * not available. We also cannot reliably use the reducer in all cases
+        * since redirects may be across hosts or even domains. The example
+        * above has redirects that will end up in the same reducer. During that
+        * phase, however, we do not know which URL redirects to the next URL.
+        */
+      // Do not resolve homepages when the root URL is unfetched
+      if (crawlDatum.getStatus() != CrawlDatum.STATUS_DB_UNFETCHED) {
+        // Get the protocol
+        String protocol = URLUtil.getProtocol(keyStr);
+        
+        // Get the proposed homepage URL
+        String homepage = protocol + "://" + buffer + "/";
+
+        // Check if the current key is equals the host
+        if (keyStr.equals(homepage)) {
+          // Check if this is a redirect to the real home page
+          if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
+            crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+            // Obtain the repr url for this redirect via protocolstatus from the metadata
+            ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData().
+              get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+
+            // Get the protocol status' arguments
+            args = z.getArgs();
+
+            // ..and the possible redirect URL
+            reprUrl = args[0];
+
+            // Am i a redirect?
+            if (reprUrl != null) {
+              LOG.info("UpdateHostDb: homepage: " + keyStr + " redirects to: " + args[0]);
+              output.collect(host, new NutchWritable(hostDatum));
+              hostDatum.setHomepageUrl(reprUrl);
+            } else {
+              LOG.info("UpdateHostDb: homepage: " + keyStr + 
+                " redirects to: " + args[0] + " but has been filtered out");
+            }
+          } else {
+            hostDatum.setHomepageUrl(homepage);
+            output.collect(host, new NutchWritable(hostDatum));
+            LOG.info("UpdateHostDb: homepage: " + homepage);
+          }
+        }
+      }
+
+      // Always emit crawl datum
+      output.collect(host, new NutchWritable(crawlDatum));
+    }
+
+    // Check if we got a record from the hostdb
+    if (key instanceof Text && value instanceof HostDatum) {
+      buffer = filterNormalize(keyStr);
+
+      // Filtered out?
+      if (buffer == null) {
+        reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+        LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered");
+        return;
+      }
+
+      // Get a HostDatum
+      hostDatum = (HostDatum)value;
+      key.set(buffer);
+
+      // If we're also reading CrawlDb entries, reset db_* statistics because
+      // we're aggregating them from CrawlDB anyway
+      if (readingCrawlDb) {
+        hostDatum.resetStatistics();
+      }
+
+      output.collect(key, new NutchWritable(hostDatum));
+    }
+
+    // Check if we got a record with host scores
+    if (key instanceof Text && value instanceof Text) {
+      buffer = filterNormalize(keyStr);
+
+      // Filtered out?
+      if (buffer == null) {
+        reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+        LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered");
+        return;
+      }
+
+      key.set(buffer);
+
+      output.collect(key,
+        new NutchWritable(new FloatWritable(Float.parseFloat(value.toString()))));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
new file mode 100644
index 0000000..33dd18b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+
+import com.tdunning.math.stats.TDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class UpdateHostDbReducer
+  implements Reducer<Text, NutchWritable, Text, HostDatum> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbReducer.class);
+  protected ResolverThread resolverThread = null;
+  protected Integer numResolverThreads = 10;
+  protected static Integer purgeFailedHostsThreshold = -1;
+  protected static Integer recheckInterval = 86400000;
+  protected static boolean checkFailed = false;
+  protected static boolean checkNew = false;
+  protected static boolean checkKnown = false;
+  protected static boolean force = false;
+  protected static long now = new Date().getTime();
+  protected static String[] numericFields;
+  protected static String[] stringFields;
+  protected static int[] percentiles;
+  protected static Text[] numericFieldWritables;
+  protected static Text[] stringFieldWritables;
+  
+  protected BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+  protected ThreadPoolExecutor executor = null;
+
+  /**
+    * Configures the thread pool and prestarts all resolver threads.
+    *
+    * @param JobConf
+    */
+  public void configure(JobConf job) {
+    purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1);
+    numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10);
+    recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000;
+    checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false);
+    checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false);
+    checkKnown = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_KNOWN, false);
+    force = job.getBoolean(UpdateHostDb.HOSTDB_FORCE_CHECK, false);
+    numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS);
+    stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS);
+    percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES);
+    
+    // What fields do we need to collect metadata from
+    if (numericFields != null) {
+      numericFieldWritables = new Text[numericFields.length];
+      for (int i = 0; i < numericFields.length; i++) {
+        numericFieldWritables[i] = new Text(numericFields[i]);
+      }
+    }
+    
+    if (stringFields != null) {
+      stringFieldWritables = new Text[stringFields.length];
+      for (int i = 0; i < stringFields.length; i++) {
+        stringFieldWritables[i] = new Text(stringFields[i]);
+      }
+    }
+
+    // Initialize the thread pool with our queue
+    executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads,
+      5, TimeUnit.SECONDS, queue);
+
+    // Run all threads in the pool
+    executor.prestartAllCoreThreads();
+  }
+
+  /**
+    *
+    */
+  public void reduce(Text key, Iterator<NutchWritable> values,
+    OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException {
+
+    Map<String,Map<String,Integer>> stringCounts = new HashMap<String,Map<String, Integer>>();
+    Map<String,Float> maximums = new HashMap<String,Float>();
+    Map<String,Float> sums = new HashMap<String,Float>(); // used to calc averages
+    Map<String,Integer> counts = new HashMap<String,Integer>(); // used to calc averages
+    Map<String,Float> minimums = new HashMap<String,Float>();
+    Map<String,TDigest> tdigests = new HashMap<String,TDigest>();
+    
+    HostDatum hostDatum = new HostDatum();
+    float score = 0;
+    
+    if (stringFields != null) {
+      for (int i = 0; i < stringFields.length; i++) {
+        stringCounts.put(stringFields[i], new HashMap<String,Integer>());
+      }
+    }
+    
+    // Loop through all values until we find a non-empty HostDatum or use
+    // an empty if this is a new host for the host db
+    while (values.hasNext()) {
+      Writable value = values.next().get();
+      
+      // Count crawl datum status's and collect metadata from fields
+      if (value instanceof CrawlDatum) {
+        CrawlDatum buffer = (CrawlDatum)value;
+        
+        // Set the correct status field
+        switch (buffer.getStatus()) {
+          case CrawlDatum.STATUS_DB_UNFETCHED:
+            hostDatum.setUnfetched(hostDatum.getUnfetched() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_FETCHED:
+            hostDatum.setFetched(hostDatum.getFetched() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_GONE:
+            hostDatum.setGone(hostDatum.getGone() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_REDIR_TEMP:
+            hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_REDIR_PERM:
+            hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_NOTMODIFIED:
+            hostDatum.setNotModified(hostDatum.getNotModified() + 1);
+            break;
+        }
+        
+        // Record connection failures
+        if (buffer.getRetriesSinceFetch() != 0) {
+          hostDatum.incConnectionFailures();
+        }
+        
+        // Only gather metadata statistics for proper fetched pages
+        if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {            
+          // Deal with the string fields
+          if (stringFields != null) {
+            for (int i = 0; i < stringFields.length; i++) {
+              // Does this field exist?
+              if (buffer.getMetaData().get(stringFieldWritables[i]) != null) {
+                // Get it!
+                String metadataValue = null;
+                try {
+                  metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString();
+                } catch (Exception e) {
+                  LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value");
+                }
+              
+                // Does the value exist?
+                if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
+                  // Yes, increment it
+                  stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1);
+                } else {
+                  // Create it!
+                  stringCounts.get(stringFields[i]).put(metadataValue, 1);
+                }
+              }
+            }
+          }
+          
+          // Deal with the numeric fields
+          if (numericFields != null) {
+            for (int i = 0; i < numericFields.length; i++) {
+              // Does this field exist?
+              if (buffer.getMetaData().get(numericFieldWritables[i]) != null) {
+                try {
+                  // Get it!
+                  Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString());
+                  
+                  // Does the median value exist?
+                  if (tdigests.containsKey(numericFields[i])) {
+                    tdigests.get(numericFields[i]).add(metadataValue);
+                  } else {
+                    // Create it!
+                    TDigest tdigest = TDigest.createDigest(100);
+                    tdigest.add((double)metadataValue);
+                    tdigests.put(numericFields[i], tdigest);
+                  }
+                
+                  // Does the minimum value exist?
+                  if (minimums.containsKey(numericFields[i])) {
+                    // Write if this is lower than existing value
+                    if (metadataValue < minimums.get(numericFields[i])) {
+                      minimums.put(numericFields[i], metadataValue);
+                    }
+                  } else {
+                    // Create it!
+                    minimums.put(numericFields[i], metadataValue);
+                  }
+                  
+                  // Does the maximum value exist?
+                  if (maximums.containsKey(numericFields[i])) {
+                    // Write if this is lower than existing value
+                    if (metadataValue > maximums.get(numericFields[i])) {
+                      maximums.put(numericFields[i], metadataValue);
+                    }
+                  } else {
+                    // Create it!
+                    maximums.put(numericFields[i], metadataValue);
+                  }
+                  
+                  // Sum it up!
+                  if (sums.containsKey(numericFields[i])) {
+                    // Increment
+                    sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue);
+                    counts.put(numericFields[i], counts.get(numericFields[i]) + 1);
+                  } else {
+                    // Create it!
+                    sums.put(numericFields[i], metadataValue);
+                    counts.put(numericFields[i], 1);
+                  }
+                } catch (Exception e) {
+                  LOG.error(e.getMessage() + " when processing values for " + key.toString());
+                }
+              }
+            }
+          }
+        }
+      }
+      
+      // 
+      if (value instanceof HostDatum) {
+        HostDatum buffer = (HostDatum)value;
+
+        // Check homepage URL
+        if (buffer.hasHomepageUrl()) {
+          hostDatum.setHomepageUrl(buffer.getHomepageUrl());
+        }
+
+        // Check lastCheck timestamp
+        if (!buffer.isEmpty()) {
+          hostDatum.setLastCheck(buffer.getLastCheck());
+        }
+
+        // Check and set DNS failures
+        if (buffer.getDnsFailures() > 0) {
+          hostDatum.setDnsFailures(buffer.getDnsFailures());
+        }
+
+        // Check and set connection failures
+        if (buffer.getConnectionFailures() > 0) {
+          hostDatum.setConnectionFailures(buffer.getConnectionFailures());
+        }
+        
+        // Check metadata
+        if (!buffer.getMetaData().isEmpty()) {
+          hostDatum.setMetaData(buffer.getMetaData());
+        }
+
+        // Check and set score (score from Web Graph has precedence)
+        if (buffer.getScore() > 0) {
+          hostDatum.setScore(buffer.getScore());
+        }
+      }
+
+      // Check for the score
+      if (value instanceof FloatWritable) {
+        FloatWritable buffer = (FloatWritable)value;
+        score = buffer.get();
+      }
+    }
+
+    // Check if score was set from Web Graph
+    if (score > 0) {
+      hostDatum.setScore(score);
+    }
+    
+    // Set metadata
+    for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) {
+      for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) {
+        hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue()));
+      }
+    }
+    for (Map.Entry<String, Float> entry : maximums.entrySet()) {
+      hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue()));
+    }
+    for (Map.Entry<String, Float> entry : sums.entrySet()) {
+      hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey())));
+    }
+    for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) {
+      // Emit all percentiles
+      for (int i = 0; i < percentiles.length; i++) {
+        hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
+      }
+    }      
+    for (Map.Entry<String, Float> entry : minimums.entrySet()) {
+      hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue()));
+    }
+    
+    reporter.incrCounter("UpdateHostDb", "total_hosts", 1);
+
+    // See if this record is to be checked
+    if (shouldCheck(hostDatum)) {
+      // Make an entry
+      resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold);
+
+      // Add the entry to the queue (blocking)
+      try {
+        queue.put(resolverThread);
+      } catch (InterruptedException e) {
+        LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+      }
+
+      // Do not progress, the datum will be written in the resolver thread
+      return;
+    } else {
+      reporter.incrCounter("UpdateHostDb", "skipped_not_eligible", 1);
+      LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible");
+    }
+
+    // Write the host datum if it wasn't written by the resolver thread
+    output.collect(key, hostDatum);
+  }
+
+  /**
+    * Determines whether a record should be checked.
+    *
+    * @param HostDatum
+    * @return boolean
+    */
+  protected boolean shouldCheck(HostDatum datum) {
+    // Whether a new record is to be checked
+    if (checkNew && datum.isEmpty()) {
+      return true;
+    }
+
+    // Whether existing known hosts should be rechecked
+    if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) {
+      return isEligibleForCheck(datum);
+    }
+
+    // Whether failed records are forced to be rechecked
+    if (checkFailed && datum.getDnsFailures() > 0) {
+      return isEligibleForCheck(datum);
+    }
+
+    // It seems this record is not to be checked
+    return false;
+  }
+
+  /**
+    * Determines whether a record is eligible for recheck.
+    *
+    * @param HostDatum
+    * @return boolean
+    */
+  protected boolean isEligibleForCheck(HostDatum datum) {
+    // Whether an existing host, known or unknown, if forced to be rechecked
+    if (force || datum.getLastCheck().getTime() +
+      (recheckInterval * datum.getDnsFailures() + 1) > now) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+    * Shut down all running threads and wait for completion.
+    */
+  public void close() {
+    LOG.info("UpdateHostDb: feeder finished, waiting for shutdown");
+
+    // If we're here all keys have been fed and we can issue a shut down
+    executor.shutdown();
+
+    boolean finished = false;
+
+    // Wait until all resolvers have finished
+    while (!finished) {
+      try {
+        // Wait for the executor to shut down completely
+        if (!executor.isTerminated()) {
+          LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize()));
+          Thread.sleep(1000);
+        } else {
+          // All is well, get out
+          finished = true;
+        }
+      } catch (InterruptedException e) {
+        // Huh?
+        LOG.warn(StringUtils.stringifyException(e));
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java b/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java
new file mode 100644
index 0000000..c16003a
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class scans CrawlDB looking for entries with status DB_GONE (404) or
+ * DB_DUPLICATE and sends delete requests to indexers for those documents.
+ */
+
+public class CleaningJob implements Tool {
+  public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class);
+  private Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public static class DBFilter implements
+      Mapper<Text, CrawlDatum, ByteWritable, Text> {
+    private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE);
+
+    @Override
+    public void configure(JobConf arg0) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<ByteWritable, Text> output, Reporter reporter)
+        throws IOException {
+
+      if (value.getStatus() == CrawlDatum.STATUS_DB_GONE
+          || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+        output.collect(OUT, key);
+      }
+    }
+  }
+
+  public static class DeleterReducer implements
+      Reducer<ByteWritable, Text, Text, ByteWritable> {
+    private static final int NUM_MAX_DELETE_REQUEST = 1000;
+    private int numDeletes = 0;
+    private int totalDeleted = 0;
+
+    private boolean noCommit = false;
+
+    IndexWriters writers = null;
+
+    @Override
+    public void configure(JobConf job) {
+      writers = new IndexWriters(job);
+      try {
+        writers.open(job, "Deletion");
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      noCommit = job.getBoolean("noCommit", false);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL
+      // if (numDeletes > 0) {
+      // LOG.info("CleaningJob: deleting " + numDeletes + " documents");
+      // // TODO updateRequest.process(solr);
+      // totalDeleted += numDeletes;
+      // }
+
+      writers.close();
+
+      if (totalDeleted > 0 && !noCommit) {
+        writers.commit();
+      }
+
+      LOG.info("CleaningJob: deleted a total of " + totalDeleted + " documents");
+    }
+
+    @Override
+    public void reduce(ByteWritable key, Iterator<Text> values,
+        OutputCollector<Text, ByteWritable> output, Reporter reporter)
+        throws IOException {
+      while (values.hasNext()) {
+        Text document = values.next();
+        writers.delete(document.toString());
+        totalDeleted++;
+        reporter.incrCounter("CleaningJobStatus", "Deleted documents", 1);
+        // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+        // LOG.info("CleaningJob: deleting " + numDeletes
+        // + " documents");
+        // // TODO updateRequest.process(solr);
+        // // TODO updateRequest = new UpdateRequest();
+        // writers.delete(key.toString());
+        // totalDeleted += numDeletes;
+        // numDeletes = 0;
+        // }
+      }
+    }
+  }
+
+  public void delete(String crawldb, boolean noCommit) throws IOException {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("CleaningJob: starting at " + sdf.format(start));
+
+    JobConf job = new NutchJob(getConf());
+
+    FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
+    job.setBoolean("noCommit", noCommit);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setMapOutputKeyClass(ByteWritable.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setMapperClass(DBFilter.class);
+    job.setReducerClass(DeleterReducer.class);
+
+    job.setJobName("CleaningJob");
+
+    // need to expicitely allow deletions
+    job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
+
+    JobClient.runJob(job);
+
+    long end = System.currentTimeMillis();
+    LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public int run(String[] args) throws IOException {
+    if (args.length < 1) {
+      String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
+      LOG.error("Missing crawldb. " + usage);
+      System.err.println(usage);
+      IndexWriters writers = new IndexWriters(getConf());
+      System.err.println(writers.describe());
+      return 1;
+    }
+
+    boolean noCommit = false;
+    if (args.length == 2 && args[1].equals("-noCommit")) {
+      noCommit = true;
+    }
+
+    try {
+      delete(args[0], noCommit);
+    } catch (final Exception e) {
+      LOG.error("CleaningJob: " + StringUtils.stringifyException(e));
+      System.err.println("ERROR CleaningJob: "
+          + StringUtils.stringifyException(e));
+      return -1;
+    }
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(), new CleaningJob(),
+        args);
+    System.exit(result);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java
new file mode 100644
index 0000000..fbbf2e8
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.plugin.Pluggable;
+
+public interface IndexWriter extends Pluggable, Configurable {
+  /** The name of the extension point. */
+  final static String X_POINT_ID = IndexWriter.class.getName();
+
+  public void open(JobConf job, String name) throws IOException;
+
+  public void write(NutchDocument doc) throws IOException;
+
+  public void delete(String key) throws IOException;
+
+  public void update(NutchDocument doc) throws IOException;
+
+  public void commit() throws IOException;
+
+  public void close() throws IOException;
+
+  /**
+   * Returns a String describing the IndexWriter instance and the specific
+   * parameters it can take
+   */
+  public String describe();
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java
new file mode 100644
index 0000000..681812b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.util.ObjectCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Creates and caches {@link IndexWriter} implementing plugins. */
+public class IndexWriters {
+
+  public final static Logger LOG = LoggerFactory.getLogger(IndexWriters.class);
+
+  private IndexWriter[] indexWriters;
+
+  public IndexWriters(Configuration conf) {
+    ObjectCache objectCache = ObjectCache.get(conf);
+    synchronized (objectCache) {
+      this.indexWriters = (IndexWriter[]) objectCache
+          .getObject(IndexWriter.class.getName());
+      if (this.indexWriters == null) {
+        try {
+          ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+              IndexWriter.X_POINT_ID);
+          if (point == null)
+            throw new RuntimeException(IndexWriter.X_POINT_ID + " not found.");
+          Extension[] extensions = point.getExtensions();
+          HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>();
+          for (int i = 0; i < extensions.length; i++) {
+            Extension extension = extensions[i];
+            IndexWriter writer = (IndexWriter) extension.getExtensionInstance();
+            LOG.info("Adding " + writer.getClass().getName());
+            if (!indexerMap.containsKey(writer.getClass().getName())) {
+              indexerMap.put(writer.getClass().getName(), writer);
+            }
+          }
+          objectCache.setObject(IndexWriter.class.getName(), indexerMap
+              .values().toArray(new IndexWriter[0]));
+        } catch (PluginRuntimeException e) {
+          throw new RuntimeException(e);
+        }
+        this.indexWriters = (IndexWriter[]) objectCache
+            .getObject(IndexWriter.class.getName());
+      }
+    }
+  }
+
+  public void open(JobConf job, String name) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].open(job, name);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void write(NutchDocument doc) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].write(doc);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void update(NutchDocument doc) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].update(doc);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void delete(String key) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].delete(key);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void close() throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].close();
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void commit() throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].commit();
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  // lists the active IndexWriters and their configuration
+  public String describe() throws IOException {
+    StringBuffer buffer = new StringBuffer();
+    if (this.indexWriters.length == 0)
+      buffer.append("No IndexWriters activated - check your configuration\n");
+    else
+      buffer.append("Active IndexWriters :\n");
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      buffer.append(this.indexWriters[i].describe()).append("\n");
+    }
+    return buffer.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java
new file mode 100644
index 0000000..5025525
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+
+public class IndexerMapReduce extends Configured implements
+    Mapper<Text, Writable, Text, NutchWritable>,
+    Reducer<Text, NutchWritable, Text, NutchIndexAction> {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(IndexerMapReduce.class);
+
+  public static final String INDEXER_PARAMS = "indexer.additional.params";
+  public static final String INDEXER_DELETE = "indexer.delete";
+  public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex";
+  public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter";
+  public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified";
+  public static final String URL_FILTERING = "indexer.url.filters";
+  public static final String URL_NORMALIZING = "indexer.url.normalizers";
+  public static final String INDEXER_BINARY_AS_BASE64 = "indexer.binary.base64";
+
+  private boolean skip = false;
+  private boolean delete = false;
+  private boolean deleteRobotsNoIndex = false;
+  private boolean deleteSkippedByIndexingFilter = false;
+  private boolean base64 = false;
+  private IndexingFilters filters;
+  private ScoringFilters scfilters;
+
+  // using normalizers and/or filters
+  private boolean normalize = false;
+  private boolean filter = false;
+
+  // url normalizers, filters and job configuration
+  private URLNormalizers urlNormalizers;
+  private URLFilters urlFilters;
+
+  /** Predefined action to delete documents from the index */
+  private static final NutchIndexAction DELETE_ACTION = new NutchIndexAction(
+      null, NutchIndexAction.DELETE);
+
+  public void configure(JobConf job) {
+    setConf(job);
+    this.filters = new IndexingFilters(getConf());
+    this.scfilters = new ScoringFilters(getConf());
+    this.delete = job.getBoolean(INDEXER_DELETE, false);
+    this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX,
+        false);
+    this.deleteSkippedByIndexingFilter = job.getBoolean(INDEXER_DELETE_SKIPPED,
+        false);
+    this.skip = job.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
+    this.base64 = job.getBoolean(INDEXER_BINARY_AS_BASE64, false);
+
+    normalize = job.getBoolean(URL_NORMALIZING, false);
+    filter = job.getBoolean(URL_FILTERING, false);
+
+    if (normalize) {
+      urlNormalizers = new URLNormalizers(getConf(),
+          URLNormalizers.SCOPE_INDEXER);
+    }
+
+    if (filter) {
+      urlFilters = new URLFilters(getConf());
+    }
+  }
+
+  /**
+   * Normalizes and trims extra whitespace from the given url.
+   * 
+   * @param url
+   *          The url to normalize.
+   * 
+   * @return The normalized url.
+   */
+  private String normalizeUrl(String url) {
+    if (!normalize) {
+      return url;
+    }
+
+    String normalized = null;
+    if (urlNormalizers != null) {
+      try {
+
+        // normalize and trim the url
+        normalized = urlNormalizers
+            .normalize(url, URLNormalizers.SCOPE_INDEXER);
+        normalized = normalized.trim();
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        normalized = null;
+      }
+    }
+
+    return normalized;
+  }
+
+  /**
+   * Filters the given url.
+   * 
+   * @param url
+   *          The url to filter.
+   * 
+   * @return The filtered url or null.
+   */
+  private String filterUrl(String url) {
+    if (!filter) {
+      return url;
+    }
+
+    try {
+      url = urlFilters.filter(url);
+    } catch (Exception e) {
+      url = null;
+    }
+
+    return url;
+  }
+
+  public void map(Text key, Writable value,
+      OutputCollector<Text, NutchWritable> output, Reporter reporter)
+          throws IOException {
+
+    String urlString = filterUrl(normalizeUrl(key.toString()));
+    if (urlString == null) {
+      return;
+    } else {
+      key.set(urlString);
+    }
+
+    output.collect(key, new NutchWritable(value));
+  }
+
+  public void reduce(Text key, Iterator<NutchWritable> values,
+      OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
+          throws IOException {
+    Inlinks inlinks = null;
+    CrawlDatum dbDatum = null;
+    CrawlDatum fetchDatum = null;
+    Content content = null;
+    ParseData parseData = null;
+    ParseText parseText = null;
+
+    while (values.hasNext()) {
+      final Writable value = values.next().get(); // unwrap
+      if (value instanceof Inlinks) {
+        inlinks = (Inlinks) value;
+      } else if (value instanceof CrawlDatum) {
+        final CrawlDatum datum = (CrawlDatum) value;
+        if (CrawlDatum.hasDbStatus(datum)) {
+          dbDatum = datum;
+        } else if (CrawlDatum.hasFetchStatus(datum)) {
+          // don't index unmodified (empty) pages
+          if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+            fetchDatum = datum;
+          }
+        } else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
+            || CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
+            || CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
+          continue;
+        } else {
+          throw new RuntimeException("Unexpected status: " + datum.getStatus());
+        }
+      } else if (value instanceof ParseData) {
+        parseData = (ParseData) value;
+
+        // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
+        if (deleteRobotsNoIndex) {
+          // Get the robots meta data
+          String robotsMeta = parseData.getMeta("robots");
+
+          // Has it a noindex for this url?
+          if (robotsMeta != null
+              && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
+            // Delete it!
+            output.collect(key, DELETE_ACTION);
+            reporter.incrCounter("IndexerStatus", "deleted (robots=noindex)", 1);
+            return;
+          }
+        }
+      } else if (value instanceof ParseText) {
+        parseText = (ParseText) value;
+      } else if (value instanceof Content) {
+        content = (Content)value;
+      } else if (LOG.isWarnEnabled()) {
+        LOG.warn("Unrecognized type: " + value.getClass());
+      }
+    }
+
+    // Whether to delete GONE or REDIRECTS
+    if (delete && fetchDatum != null && dbDatum != null) {
+      if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
+          || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
+        reporter.incrCounter("IndexerStatus", "deleted (gone)", 1);
+        output.collect(key, DELETE_ACTION);
+        return;
+      }
+
+      if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+          || fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+          || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM
+          || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+        reporter.incrCounter("IndexerStatus", "deleted (redirects)", 1);
+        output.collect(key, DELETE_ACTION);
+        return;
+      }
+    }
+
+    if (fetchDatum == null || dbDatum == null || parseText == null
+        || parseData == null) {
+      return; // only have inlinks
+    }
+
+    // Whether to delete pages marked as duplicates
+    if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+      reporter.incrCounter("IndexerStatus", "deleted (duplicates)", 1);
+      output.collect(key, DELETE_ACTION);
+      return;
+    }
+
+    // Whether to skip DB_NOTMODIFIED pages
+    if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+      reporter.incrCounter("IndexerStatus", "skipped (not modified)", 1);
+      return;
+    }
+
+    if (!parseData.getStatus().isSuccess()
+        || fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
+      return;
+    }
+
+    NutchDocument doc = new NutchDocument();
+    doc.add("id", key.toString());
+
+    final Metadata metadata = parseData.getContentMeta();
+
+    // add segment, used to map from merged index back to segment files
+    doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
+
+    // add digest, used by dedup
+    doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
+    
+    final Parse parse = new ParseImpl(parseText, parseData);
+    float boost = 1.0f;
+    // run scoring filters
+    try {
+      boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
+          inlinks, boost);
+    } catch (final ScoringFilterException e) {
+      reporter.incrCounter("IndexerStatus", "errors (ScoringFilter)", 1);
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Error calculating score {}: {}", key, e);
+      }
+      return;
+    }
+    // apply boost to all indexed fields.
+    doc.setWeight(boost);
+    // store boost for use by explain and dedup
+    doc.add("boost", Float.toString(boost));
+
+    try {
+      // Indexing filters may also be interested in the signature
+      fetchDatum.setSignature(dbDatum.getSignature());
+      
+      // extract information from dbDatum and pass it to
+      // fetchDatum so that indexing filters can use it
+      final Text url = (Text) dbDatum.getMetaData().get(
+          Nutch.WRITABLE_REPR_URL_KEY);
+      if (url != null) {
+        // Representation URL also needs normalization and filtering.
+        // If repr URL is excluded by filters we still accept this document
+        // but represented by its primary URL ("key") which has passed URL
+        // filters.
+        String urlString = filterUrl(normalizeUrl(url.toString()));
+        if (urlString != null) {
+          url.set(urlString);
+          fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
+        }
+      }
+      // run indexing filters
+      doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
+    } catch (final IndexingException e) {
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Error indexing " + key + ": " + e);
+      }
+      reporter.incrCounter("IndexerStatus", "errors (IndexingFilter)", 1);
+      return;
+    }
+
+    // skip documents discarded by indexing filters
+    if (doc == null) {
+      // https://issues.apache.org/jira/browse/NUTCH-1449
+      if (deleteSkippedByIndexingFilter) {
+        NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+        output.collect(key, action);
+        reporter.incrCounter("IndexerStatus", "deleted (IndexingFilter)", 1);
+      } else {
+        reporter.incrCounter("IndexerStatus", "skipped (IndexingFilter)", 1);
+      }
+      return;
+    }
+
+    if (content != null) {
+      // Add the original binary content
+      String binary;
+      if (base64) {
+        // optionally encode as base64
+        binary = Base64.encodeBase64String(content.getContent());
+      } else {
+        binary = new String(content.getContent());
+      }
+      doc.add("binaryContent", binary);
+    }
+
+    reporter.incrCounter("IndexerStatus", "indexed (add/update)", 1);
+
+    NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
+    output.collect(key, action);
+  }
+
+  public void close() throws IOException {
+  }
+
+  public static void initMRJob(Path crawlDb, Path linkDb,
+      Collection<Path> segments, JobConf job, boolean addBinaryContent) {
+
+    LOG.info("IndexerMapReduce: crawldb: {}", crawlDb);
+
+    if (linkDb != null)
+      LOG.info("IndexerMapReduce: linkdb: {}", linkDb);
+
+    for (final Path segment : segments) {
+      LOG.info("IndexerMapReduces: adding segment: {}", segment);
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.FETCH_DIR_NAME));
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.PARSE_DIR_NAME));
+      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
+      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
+
+      if (addBinaryContent) {
+        FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+      }
+    }
+
+    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
+
+    if (linkDb != null) {
+      Path currentLinkDb = new Path(linkDb, LinkDb.CURRENT_NAME);
+      try {
+        if (FileSystem.get(job).exists(currentLinkDb)) {
+          FileInputFormat.addInputPath(job, currentLinkDb);
+        } else {
+          LOG.warn("Ignoring linkDb for indexing, no linkDb found in path: {}",
+              linkDb);
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to use linkDb ({}) for indexing: {}", linkDb,
+            org.apache.hadoop.util.StringUtils.stringifyException(e));
+      }
+    }
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(IndexerMapReduce.class);
+    job.setReducerClass(IndexerMapReduce.class);
+
+    job.setOutputFormat(IndexerOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+    job.setOutputValueClass(NutchWritable.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java
new file mode 100644
index 0000000..baa9ce6
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+public class IndexerOutputFormat extends
+    FileOutputFormat<Text, NutchIndexAction> {
+
+  @Override
+  public RecordWriter<Text, NutchIndexAction> getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+
+    final IndexWriters writers = new IndexWriters(job);
+
+    writers.open(job, name);
+
+    return new RecordWriter<Text, NutchIndexAction>() {
+
+      public void close(Reporter reporter) throws IOException {
+        writers.close();
+      }
+
+      public void write(Text key, NutchIndexAction indexAction)
+          throws IOException {
+        if (indexAction.action == NutchIndexAction.ADD) {
+          writers.write(indexAction.doc);
+        } else if (indexAction.action == NutchIndexAction.DELETE) {
+          writers.delete(key.toString());
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java
new file mode 100644
index 0000000..adfefeb
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+@SuppressWarnings("serial")
+public class IndexingException extends Exception {
+
+  public IndexingException() {
+    super();
+  }
+
+  public IndexingException(String message) {
+    super(message);
+  }
+
+  public IndexingException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public IndexingException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java
new file mode 100644
index 0000000..f22a0e5
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+// Hadoop imports
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Text;
+
+// Nutch imports
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.plugin.Pluggable;
+
+/**
+ * Extension point for indexing. Permits one to add metadata to the indexed
+ * fields. All plugins found which implement this extension point are run
+ * sequentially on the parse.
+ */
+public interface IndexingFilter extends Pluggable, Configurable {
+  /** The name of the extension point. */
+  final static String X_POINT_ID = IndexingFilter.class.getName();
+
+  /**
+   * Adds fields or otherwise modifies the document that will be indexed for a
+   * parse. Unwanted documents can be removed from indexing by returning a null
+   * value.
+   * 
+   * @param doc
+   *          document instance for collecting fields
+   * @param parse
+   *          parse data instance
+   * @param url
+   *          page url
+   * @param datum
+   *          crawl datum for the page (fetch datum from segment containing
+   *          fetch status and fetch time)
+   * @param inlinks
+   *          page inlinks
+   * @return modified (or a new) document instance, or null (meaning the
+   *         document should be discarded)
+   * @throws IndexingException
+   */
+  NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+      CrawlDatum datum, Inlinks inlinks) throws IndexingException;
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java
new file mode 100644
index 0000000..334fcad
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.parse.Parse;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.hadoop.io.Text;
+
+/** Creates and caches {@link IndexingFilter} implementing plugins. */
+public class IndexingFilters {
+
+  public static final String INDEXINGFILTER_ORDER = "indexingfilter.order";
+
+  public final static Logger LOG = LoggerFactory
+      .getLogger(IndexingFilters.class);
+
+  private IndexingFilter[] indexingFilters;
+
+  public IndexingFilters(Configuration conf) {
+    indexingFilters = (IndexingFilter[]) PluginRepository.get(conf)
+        .getOrderedPlugins(IndexingFilter.class, IndexingFilter.X_POINT_ID,
+            INDEXINGFILTER_ORDER);
+  }
+
+  /** Run all defined filters. */
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+      CrawlDatum datum, Inlinks inlinks) throws IndexingException {
+    for (int i = 0; i < this.indexingFilters.length; i++) {
+      doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks);
+      // break the loop if an indexing filter discards the doc
+      if (doc == null)
+        return null;
+    }
+
+    return doc;
+  }
+
+}