You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:55 UTC

[39/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java
new file mode 100644
index 0000000..a35e842
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.metadata.Metadata;
+
+/**
+ * A class which holds the number of inlinks and outlinks for a given url along
+ * with an inlink score from a link analysis program and any metadata.
+ * 
+ * The Node is the core unit of the NodeDb in the WebGraph.
+ */
+public class Node implements Writable {
+
+  private int numInlinks = 0;
+  private int numOutlinks = 0;
+  private float inlinkScore = 1.0f;
+  private Metadata metadata = new Metadata();
+
+  public Node() {
+
+  }
+
+  public int getNumInlinks() {
+    return numInlinks;
+  }
+
+  public void setNumInlinks(int numInlinks) {
+    this.numInlinks = numInlinks;
+  }
+
+  public int getNumOutlinks() {
+    return numOutlinks;
+  }
+
+  public void setNumOutlinks(int numOutlinks) {
+    this.numOutlinks = numOutlinks;
+  }
+
+  public float getInlinkScore() {
+    return inlinkScore;
+  }
+
+  public void setInlinkScore(float inlinkScore) {
+    this.inlinkScore = inlinkScore;
+  }
+
+  public float getOutlinkScore() {
+    return (numOutlinks > 0) ? inlinkScore / numOutlinks : inlinkScore;
+  }
+
+  public Metadata getMetadata() {
+    return metadata;
+  }
+
+  public void setMetadata(Metadata metadata) {
+    this.metadata = metadata;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+
+    numInlinks = in.readInt();
+    numOutlinks = in.readInt();
+    inlinkScore = in.readFloat();
+    metadata.clear();
+    metadata.readFields(in);
+  }
+
+  public void write(DataOutput out) throws IOException {
+
+    out.writeInt(numInlinks);
+    out.writeInt(numOutlinks);
+    out.writeFloat(inlinkScore);
+    metadata.write(out);
+  }
+
+  public String toString() {
+    return "num inlinks: " + numInlinks + ", num outlinks: " + numOutlinks
+        + ", inlink score: " + inlinkScore + ", outlink score: "
+        + getOutlinkScore() + ", metadata: " + metadata.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
new file mode 100644
index 0000000..4a57c29
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * A tools that dumps out the top urls by number of inlinks, number of outlinks,
+ * or by score, to a text file. One of the major uses of this tool is to check
+ * the top scoring urls of a link analysis program such as LinkRank.
+ * 
+ * For number of inlinks or number of outlinks the WebGraph program will need to
+ * have been run. For link analysis score a program such as LinkRank will need
+ * to have been run which updates the NodeDb of the WebGraph.
+ */
+public class NodeDumper extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(NodeDumper.class);
+
+  private static enum DumpType {
+    INLINKS, OUTLINKS, SCORES
+  }
+
+  private static enum AggrType {
+    SUM, MAX
+  }
+
+  private static enum NameType {
+    HOST, DOMAIN
+  }
+
+  /**
+   * Outputs the top urls sorted in descending order. Depending on the flag set
+   * on the command line, the top urls could be for number of inlinks, for
+   * number of outlinks, or for link analysis score.
+   */
+  public static class Sorter extends Configured implements
+      Mapper<Text, Node, FloatWritable, Text>,
+      Reducer<FloatWritable, Text, Text, FloatWritable> {
+
+    private JobConf conf;
+    private boolean inlinks = false;
+    private boolean outlinks = false;
+    private boolean scores = false;
+    private long topn = Long.MAX_VALUE;
+
+    /**
+     * Configures the job, sets the flag for type of content and the topN number
+     * if any.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      this.inlinks = conf.getBoolean("inlinks", false);
+      this.outlinks = conf.getBoolean("outlinks", false);
+      this.scores = conf.getBoolean("scores", true);
+      this.topn = conf.getLong("topn", Long.MAX_VALUE);
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Outputs the url with the appropriate number of inlinks, outlinks, or for
+     * score.
+     */
+    public void map(Text key, Node node,
+        OutputCollector<FloatWritable, Text> output, Reporter reporter)
+        throws IOException {
+
+      float number = 0;
+      if (inlinks) {
+        number = node.getNumInlinks();
+      } else if (outlinks) {
+        number = node.getNumOutlinks();
+      } else {
+        number = node.getInlinkScore();
+      }
+
+      // number collected with negative to be descending
+      output.collect(new FloatWritable(-number), key);
+    }
+
+    /**
+     * Flips and collects the url and numeric sort value.
+     */
+    public void reduce(FloatWritable key, Iterator<Text> values,
+        OutputCollector<Text, FloatWritable> output, Reporter reporter)
+        throws IOException {
+
+      // take the negative of the negative to get original value, sometimes 0
+      // value are a little weird
+      float val = key.get();
+      FloatWritable number = new FloatWritable(val == 0 ? 0 : -val);
+      long numCollected = 0;
+
+      // collect all values, this time with the url as key
+      while (values.hasNext() && (numCollected < topn)) {
+        Text url = WritableUtils.clone(values.next(), conf);
+        output.collect(url, number);
+        numCollected++;
+      }
+    }
+  }
+
+  /**
+   * Outputs the hosts or domains with an associated value. This value consists
+   * of either the number of inlinks, the number of outlinks or the score. The
+   * computed value is then either the sum of all parts or the top value.
+   */
+  public static class Dumper extends Configured implements
+      Mapper<Text, Node, Text, FloatWritable>,
+      Reducer<Text, FloatWritable, Text, FloatWritable> {
+
+    private JobConf conf;
+    private boolean inlinks = false;
+    private boolean outlinks = false;
+    private boolean scores = false;
+    private long topn = Long.MAX_VALUE;
+    private boolean host = false;
+    private boolean domain = false;
+    private boolean sum = false;
+    private boolean max = false;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      this.inlinks = conf.getBoolean("inlinks", false);
+      this.outlinks = conf.getBoolean("outlinks", false);
+      this.scores = conf.getBoolean("scores", true);
+      this.topn = conf.getLong("topn", Long.MAX_VALUE);
+      this.host = conf.getBoolean("host", false);
+      this.domain = conf.getBoolean("domain", false);
+      this.sum = conf.getBoolean("sum", false);
+      this.max = conf.getBoolean("max", false);
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Outputs the host or domain as key for this record and numInlinks,
+     * numOutlinks or score as the value.
+     */
+    public void map(Text key, Node node,
+        OutputCollector<Text, FloatWritable> output, Reporter reporter)
+        throws IOException {
+
+      float number = 0;
+      if (inlinks) {
+        number = node.getNumInlinks();
+      } else if (outlinks) {
+        number = node.getNumOutlinks();
+      } else {
+        number = node.getInlinkScore();
+      }
+
+      if (host) {
+        key.set(URLUtil.getHost(key.toString()));
+      } else {
+        key.set(URLUtil.getDomainName(key.toString()));
+      }
+
+      output.collect(key, new FloatWritable(number));
+    }
+
+    /**
+     * Outputs either the sum or the top value for this record.
+     */
+    public void reduce(Text key, Iterator<FloatWritable> values,
+        OutputCollector<Text, FloatWritable> output, Reporter reporter)
+        throws IOException {
+
+      long numCollected = 0;
+      float sumOrMax = 0;
+      float val = 0;
+
+      // collect all values, this time with the url as key
+      while (values.hasNext() && (numCollected < topn)) {
+        val = values.next().get();
+
+        if (sum) {
+          sumOrMax += val;
+        } else {
+          if (sumOrMax < val) {
+            sumOrMax = val;
+          }
+        }
+
+        numCollected++;
+      }
+
+      output.collect(key, new FloatWritable(sumOrMax));
+    }
+  }
+
+  /**
+   * Runs the process to dump the top urls out to a text file.
+   * 
+   * @param webGraphDb
+   *          The WebGraph from which to pull values.
+   * 
+   * @param topN
+   * @param output
+   * 
+   * @throws IOException
+   *           If an error occurs while dumping the top values.
+   */
+  public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output,
+      boolean asEff, NameType nameType, AggrType aggrType,
+      boolean asSequenceFile) throws Exception {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("NodeDumper: starting at " + sdf.format(start));
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Configuration conf = getConf();
+
+    JobConf dumper = new NutchJob(conf);
+    dumper.setJobName("NodeDumper: " + webGraphDb);
+    FileInputFormat.addInputPath(dumper, nodeDb);
+    dumper.setInputFormat(SequenceFileInputFormat.class);
+
+    if (nameType == null) {
+      dumper.setMapperClass(Sorter.class);
+      dumper.setReducerClass(Sorter.class);
+      dumper.setMapOutputKeyClass(FloatWritable.class);
+      dumper.setMapOutputValueClass(Text.class);
+    } else {
+      dumper.setMapperClass(Dumper.class);
+      dumper.setReducerClass(Dumper.class);
+      dumper.setMapOutputKeyClass(Text.class);
+      dumper.setMapOutputValueClass(FloatWritable.class);
+    }
+
+    dumper.setOutputKeyClass(Text.class);
+    dumper.setOutputValueClass(FloatWritable.class);
+    FileOutputFormat.setOutputPath(dumper, output);
+
+    if (asSequenceFile) {
+      dumper.setOutputFormat(SequenceFileOutputFormat.class);
+    } else {
+      dumper.setOutputFormat(TextOutputFormat.class);
+    }
+
+    dumper.setNumReduceTasks(1);
+    dumper.setBoolean("inlinks", type == DumpType.INLINKS);
+    dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
+    dumper.setBoolean("scores", type == DumpType.SCORES);
+
+    dumper.setBoolean("host", nameType == NameType.HOST);
+    dumper.setBoolean("domain", nameType == NameType.DOMAIN);
+    dumper.setBoolean("sum", aggrType == AggrType.SUM);
+    dumper.setBoolean("max", aggrType == AggrType.MAX);
+
+    dumper.setLong("topn", topN);
+
+    // Set equals-sign as separator for Solr's ExternalFileField
+    if (asEff) {
+      dumper.set("mapred.textoutputformat.separator", "=");
+    }
+
+    try {
+      LOG.info("NodeDumper: running");
+      JobClient.runJob(dumper);
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    long end = System.currentTimeMillis();
+    LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new NodeDumper(),
+        args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the node dumper tool.
+   */
+  public int run(String[] args) throws Exception {
+
+    Options options = new Options();
+    OptionBuilder.withArgName("help");
+    OptionBuilder.withDescription("show this help message");
+    Option helpOpts = OptionBuilder.create("help");
+    options.addOption(helpOpts);
+
+    OptionBuilder.withArgName("webgraphdb");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the web graph database to use");
+    Option webGraphDbOpts = OptionBuilder.create("webgraphdb");
+    options.addOption(webGraphDbOpts);
+
+    OptionBuilder.withArgName("inlinks");
+    OptionBuilder.withDescription("show highest inlinks");
+    Option inlinkOpts = OptionBuilder.create("inlinks");
+    options.addOption(inlinkOpts);
+
+    OptionBuilder.withArgName("outlinks");
+    OptionBuilder.withDescription("show highest outlinks");
+    Option outlinkOpts = OptionBuilder.create("outlinks");
+    options.addOption(outlinkOpts);
+
+    OptionBuilder.withArgName("scores");
+    OptionBuilder.withDescription("show highest scores");
+    Option scoreOpts = OptionBuilder.create("scores");
+    options.addOption(scoreOpts);
+
+    OptionBuilder.withArgName("topn");
+    OptionBuilder.hasOptionalArg();
+    OptionBuilder.withDescription("show topN scores");
+    Option topNOpts = OptionBuilder.create("topn");
+    options.addOption(topNOpts);
+
+    OptionBuilder.withArgName("output");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the output directory to use");
+    Option outputOpts = OptionBuilder.create("output");
+    options.addOption(outputOpts);
+
+    OptionBuilder.withArgName("asEff");
+    OptionBuilder
+        .withDescription("Solr ExternalFileField compatible output format");
+    Option effOpts = OptionBuilder.create("asEff");
+    options.addOption(effOpts);
+
+    OptionBuilder.hasArgs(2);
+    OptionBuilder.withDescription("group <host|domain> <sum|max>");
+    Option groupOpts = OptionBuilder.create("group");
+    options.addOption(groupOpts);
+
+    OptionBuilder.withArgName("asSequenceFile");
+    OptionBuilder.withDescription("whether to output as a sequencefile");
+    Option sequenceFileOpts = OptionBuilder.create("asSequenceFile");
+    options.addOption(sequenceFileOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("NodeDumper", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      boolean inlinks = line.hasOption("inlinks");
+      boolean outlinks = line.hasOption("outlinks");
+
+      long topN = (line.hasOption("topn") ? Long.parseLong(line
+          .getOptionValue("topn")) : Long.MAX_VALUE);
+
+      // get the correct dump type
+      String output = line.getOptionValue("output");
+      DumpType type = (inlinks ? DumpType.INLINKS
+          : outlinks ? DumpType.OUTLINKS : DumpType.SCORES);
+
+      NameType nameType = null;
+      AggrType aggrType = null;
+      String[] group = line.getOptionValues("group");
+      if (group != null && group.length == 2) {
+        nameType = (group[0].equals("host") ? NameType.HOST : group[0]
+            .equals("domain") ? NameType.DOMAIN : null);
+        aggrType = (group[1].equals("sum") ? AggrType.SUM : group[1]
+            .equals("sum") ? AggrType.MAX : null);
+      }
+
+      // Use ExternalFileField?
+      boolean asEff = line.hasOption("asEff");
+      boolean asSequenceFile = line.hasOption("asSequenceFile");
+
+      dumpNodes(new Path(webGraphDb), type, topN, new Path(output), asEff,
+          nameType, aggrType, asSequenceFile);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("NodeDumper: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java
new file mode 100644
index 0000000..e6b6815
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * Reads and prints to system out information for a single node from the NodeDb
+ * in the WebGraph.
+ */
+public class NodeReader extends Configured {
+
+  private FileSystem fs;
+  private MapFile.Reader[] nodeReaders;
+
+  public NodeReader() {
+
+  }
+
+  public NodeReader(Configuration conf) {
+    super(conf);
+  }
+
+  /**
+   * Prints the content of the Node represented by the url to system out.
+   * 
+   * @param webGraphDb
+   *          The webgraph from which to get the node.
+   * @param url
+   *          The url of the node.
+   * 
+   * @throws IOException
+   *           If an error occurs while getting the node.
+   */
+  public void dumpUrl(Path webGraphDb, String url) throws IOException {
+
+    fs = FileSystem.get(getConf());
+    nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
+        WebGraph.NODE_DIR), getConf());
+
+    // open the readers, get the node, print out the info, and close the readers
+    Text key = new Text(url);
+    Node node = new Node();
+    MapFileOutputFormat.getEntry(nodeReaders,
+        new HashPartitioner<Text, Node>(), key, node);
+    System.out.println(url + ":");
+    System.out.println("  inlink score: " + node.getInlinkScore());
+    System.out.println("  outlink score: " + node.getOutlinkScore());
+    System.out.println("  num inlinks: " + node.getNumInlinks());
+    System.out.println("  num outlinks: " + node.getNumOutlinks());
+    FSUtils.closeReaders(nodeReaders);
+  }
+
+  /**
+   * Runs the NodeReader tool. The command line arguments must contain a
+   * webgraphdb path and a url. The url must match the normalized url that is
+   * contained in the NodeDb of the WebGraph.
+   */
+  public static void main(String[] args) throws Exception {
+
+    Options options = new Options();
+    OptionBuilder.withArgName("help");
+    OptionBuilder.withDescription("show this help message");
+    Option helpOpts = OptionBuilder.create("help");
+    options.addOption(helpOpts);
+
+    OptionBuilder.withArgName("webgraphdb");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the webgraphdb to use");
+    Option webGraphOpts = OptionBuilder.create("webgraphdb");
+    options.addOption(webGraphOpts);
+
+    OptionBuilder.withArgName("url");
+    OptionBuilder.hasOptionalArg();
+    OptionBuilder.withDescription("the url to dump");
+    Option urlOpts = OptionBuilder.create("url");
+    options.addOption(urlOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      // command line must take a webgraphdb and a url
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+          || !line.hasOption("url")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("WebGraphReader", options);
+        return;
+      }
+
+      // dump the values to system out and return
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      String url = line.getOptionValue("url");
+      NodeReader reader = new NodeReader(NutchConfiguration.create());
+      reader.dumpUrl(new Path(webGraphDb), url);
+
+      return;
+    } catch (Exception e) {
+      e.printStackTrace();
+      return;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
new file mode 100644
index 0000000..19704eb
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * Updates the score from the WebGraph node database into the crawl database.
+ * Any score that is not in the node database is set to the clear score in the
+ * crawl database.
+ */
+public class ScoreUpdater extends Configured implements Tool,
+    Mapper<Text, Writable, Text, ObjectWritable>,
+    Reducer<Text, ObjectWritable, Text, CrawlDatum> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ScoreUpdater.class);
+
+  private JobConf conf;
+  private float clearScore = 0.0f;
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+    clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f);
+  }
+
+  /**
+   * Changes input into ObjectWritables.
+   */
+  public void map(Text key, Writable value,
+      OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+      throws IOException {
+
+    ObjectWritable objWrite = new ObjectWritable();
+    objWrite.set(value);
+    output.collect(key, objWrite);
+  }
+
+  /**
+   * Creates new CrawlDatum objects with the updated score from the NodeDb or
+   * with a cleared score.
+   */
+  public void reduce(Text key, Iterator<ObjectWritable> values,
+      OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+      throws IOException {
+
+    String url = key.toString();
+    Node node = null;
+    CrawlDatum datum = null;
+
+    // set the node and the crawl datum, should be one of each unless no node
+    // for url in the crawldb
+    while (values.hasNext()) {
+      ObjectWritable next = values.next();
+      Object value = next.get();
+      if (value instanceof Node) {
+        node = (Node) value;
+      } else if (value instanceof CrawlDatum) {
+        datum = (CrawlDatum) value;
+      }
+    }
+
+    // datum should never be null, could happen if somehow the url was
+    // normalized or changed after being pulled from the crawldb
+    if (datum != null) {
+
+      if (node != null) {
+
+        // set the inlink score in the nodedb
+        float inlinkScore = node.getInlinkScore();
+        datum.setScore(inlinkScore);
+        LOG.debug(url + ": setting to score " + inlinkScore);
+      } else {
+
+        // clear out the score in the crawldb
+        datum.setScore(clearScore);
+        LOG.debug(url + ": setting to clear score of " + clearScore);
+      }
+
+      output.collect(key, datum);
+    } else {
+      LOG.debug(url + ": no datum");
+    }
+  }
+
+  public void close() {
+  }
+
+  /**
+   * Updates the inlink score in the web graph node databsae into the crawl
+   * database.
+   * 
+   * @param crawlDb
+   *          The crawl database to update
+   * @param webGraphDb
+   *          The webgraph database to use.
+   * 
+   * @throws IOException
+   *           If an error occurs while updating the scores.
+   */
+  public void update(Path crawlDb, Path webGraphDb) throws IOException {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("ScoreUpdater: starting at " + sdf.format(start));
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // create a temporary crawldb with the new scores
+    LOG.info("Running crawldb update " + crawlDb);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+    Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random()
+        .nextInt(Integer.MAX_VALUE)));
+
+    // run the updater job outputting to the temp crawl database
+    JobConf updater = new NutchJob(conf);
+    updater.setJobName("Update CrawlDb from WebGraph");
+    FileInputFormat.addInputPath(updater, crawlDbCurrent);
+    FileInputFormat.addInputPath(updater, nodeDb);
+    FileOutputFormat.setOutputPath(updater, newCrawlDb);
+    updater.setInputFormat(SequenceFileInputFormat.class);
+    updater.setMapperClass(ScoreUpdater.class);
+    updater.setReducerClass(ScoreUpdater.class);
+    updater.setMapOutputKeyClass(Text.class);
+    updater.setMapOutputValueClass(ObjectWritable.class);
+    updater.setOutputKeyClass(Text.class);
+    updater.setOutputValueClass(CrawlDatum.class);
+    updater.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      JobClient.runJob(updater);
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+
+      // remove the temp crawldb on error
+      if (fs.exists(newCrawlDb)) {
+        fs.delete(newCrawlDb, true);
+      }
+      throw e;
+    }
+
+    // install the temp crawl database
+    LOG.info("ScoreUpdater: installing new crawldb " + crawlDb);
+    CrawlDb.install(updater, crawlDb);
+
+    long end = System.currentTimeMillis();
+    LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(),
+        args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the ScoreUpdater tool.
+   */
+  public int run(String[] args) throws Exception {
+
+    Options options = new Options();
+    OptionBuilder.withArgName("help");
+    OptionBuilder.withDescription("show this help message");
+    Option helpOpts = OptionBuilder.create("help");
+    options.addOption(helpOpts);
+
+    OptionBuilder.withArgName("crawldb");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the crawldb to use");
+    Option crawlDbOpts = OptionBuilder.create("crawldb");
+    options.addOption(crawlDbOpts);
+
+    OptionBuilder.withArgName("webgraphdb");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the webgraphdb to use");
+    Option webGraphOpts = OptionBuilder.create("webgraphdb");
+    options.addOption(webGraphOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+          || !line.hasOption("crawldb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("ScoreUpdater", options);
+        return -1;
+      }
+
+      String crawlDb = line.getOptionValue("crawldb");
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      update(new Path(crawlDb), new Path(webGraphDb));
+      return 0;
+    } catch (Exception e) {
+      LOG.error("ScoreUpdater: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java
new file mode 100644
index 0000000..e2c3d8b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Creates three databases, one for inlinks, one for outlinks, and a node
+ * database that holds the number of in and outlinks to a url and the current
+ * score for the url.
+ * 
+ * The score is set by an analysis program such as LinkRank. The WebGraph is an
+ * update-able database. Outlinks are stored by their fetch time or by the
+ * current system time if no fetch time is available. Only the most recent
+ * version of outlinks for a given url is stored. As more crawls are executed
+ * and the WebGraph updated, newer Outlinks will replace older Outlinks. This
+ * allows the WebGraph to adapt to changes in the link structure of the web.
+ * 
+ * The Inlink database is created from the Outlink database and is regenerated
+ * when the WebGraph is updated. The Node database is created from both the
+ * Inlink and Outlink databases. Because the Node database is overwritten when
+ * the WebGraph is updated and because the Node database holds current scores
+ * for urls it is recommended that a crawl-cycle (one or more full crawls) fully
+ * complete before the WebGraph is updated and some type of analysis, such as
+ * LinkRank, is run to update scores in the Node database in a stable fashion.
+ */
+public class WebGraph extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(WebGraph.class);
+  public static final String LOCK_NAME = ".locked";
+  public static final String INLINK_DIR = "inlinks";
+  public static final String OUTLINK_DIR = "outlinks/current";
+  public static final String OLD_OUTLINK_DIR = "outlinks/old";
+  public static final String NODE_DIR = "nodes";
+
+  /**
+   * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls
+   * by domain and host can be ignored. The number of Outlinks out to a given
+   * page or domain can also be limited.
+   */
+  public static class OutlinkDb extends Configured implements
+      Mapper<Text, Writable, Text, NutchWritable>,
+      Reducer<Text, NutchWritable, Text, LinkDatum> {
+
+    public static final String URL_NORMALIZING = "webgraph.url.normalizers";
+    public static final String URL_FILTERING = "webgraph.url.filters";
+
+    // ignoring internal domains, internal hosts
+    private boolean ignoreDomain = true;
+    private boolean ignoreHost = true;
+
+    // limiting urls out to a page or to a domain
+    private boolean limitPages = true;
+    private boolean limitDomains = true;
+
+    // using normalizers and/or filters
+    private boolean normalize = false;
+    private boolean filter = false;
+
+    // url normalizers, filters and job configuration
+    private URLNormalizers urlNormalizers;
+    private URLFilters filters;
+    private JobConf conf;
+
+    /**
+     * Normalizes and trims extra whitespace from the given url.
+     * 
+     * @param url
+     *          The url to normalize.
+     * 
+     * @return The normalized url.
+     */
+    private String normalizeUrl(String url) {
+
+      if (!normalize) {
+        return url;
+      }
+
+      String normalized = null;
+      if (urlNormalizers != null) {
+        try {
+
+          // normalize and trim the url
+          normalized = urlNormalizers.normalize(url,
+              URLNormalizers.SCOPE_DEFAULT);
+          normalized = normalized.trim();
+        } catch (Exception e) {
+          LOG.warn("Skipping " + url + ":" + e);
+          normalized = null;
+        }
+      }
+      return normalized;
+    }
+
+    /**
+     * Filters the given url.
+     * 
+     * @param url
+     *          The url to filter.
+     * 
+     * @return The filtered url or null.
+     */
+    private String filterUrl(String url) {
+
+      if (!filter) {
+        return url;
+      }
+
+      try {
+        url = filters.filter(url);
+      } catch (Exception e) {
+        url = null;
+      }
+
+      return url;
+    }
+
+    /**
+     * Returns the fetch time from the parse data or the current system time if
+     * the fetch time doesn't exist.
+     * 
+     * @param data
+     *          The parse data.
+     * 
+     * @return The fetch time as a long.
+     */
+    private long getFetchTime(ParseData data) {
+
+      // default to current system time
+      long fetchTime = System.currentTimeMillis();
+      String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY);
+      try {
+        // get the fetch time from the parse data
+        fetchTime = Long.parseLong(fetchTimeStr);
+      } catch (Exception e) {
+        fetchTime = System.currentTimeMillis();
+      }
+      return fetchTime;
+    }
+
+    /**
+     * Default constructor.
+     */
+    public OutlinkDb() {
+    }
+
+    /**
+     * Configurable constructor.
+     */
+    public OutlinkDb(Configuration conf) {
+      setConf(conf);
+    }
+
+    /**
+     * Configures the OutlinkDb job. Sets up internal links and link limiting.
+     */
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
+      ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
+      limitPages = conf.getBoolean("link.ignore.limit.page", true);
+      limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+
+      normalize = conf.getBoolean(URL_NORMALIZING, false);
+      filter = conf.getBoolean(URL_FILTERING, false);
+
+      if (normalize) {
+        urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+      }
+
+      if (filter) {
+        filters = new URLFilters(conf);
+      }
+    }
+
+    /**
+     * Passes through existing LinkDatum objects from an existing OutlinkDb and
+     * maps out new LinkDatum objects from new crawls ParseData.
+     */
+    public void map(Text key, Writable value,
+        OutputCollector<Text, NutchWritable> output, Reporter reporter)
+        throws IOException {
+
+      // normalize url, stop processing if null
+      String url = normalizeUrl(key.toString());
+      if (url == null) {
+        return;
+      }
+
+      // filter url
+      if (filterUrl(url) == null) {
+        return;
+      }
+
+      // Overwrite the key with the normalized URL
+      key.set(url);
+
+      if (value instanceof CrawlDatum) {
+        CrawlDatum datum = (CrawlDatum) value;
+
+        if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+            || datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+            || datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
+
+          // Tell the reducer to get rid of all instances of this key
+          output.collect(key, new NutchWritable(new BooleanWritable(true)));
+        }
+      } else if (value instanceof ParseData) {
+        // get the parse data and the outlinks from the parse data, along with
+        // the fetch time for those links
+        ParseData data = (ParseData) value;
+        long fetchTime = getFetchTime(data);
+        Outlink[] outlinkAr = data.getOutlinks();
+        Map<String, String> outlinkMap = new LinkedHashMap<String, String>();
+
+        // normalize urls and put into map
+        if (outlinkAr != null && outlinkAr.length > 0) {
+          for (int i = 0; i < outlinkAr.length; i++) {
+            Outlink outlink = outlinkAr[i];
+            String toUrl = normalizeUrl(outlink.getToUrl());
+
+            if (filterUrl(toUrl) == null) {
+              continue;
+            }
+
+            // only put into map if the url doesn't already exist in the map or
+            // if it does and the anchor for that link is null, will replace if
+            // url is existing
+            boolean existingUrl = outlinkMap.containsKey(toUrl);
+            if (toUrl != null
+                && (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) {
+              outlinkMap.put(toUrl, outlink.getAnchor());
+            }
+          }
+        }
+
+        // collect the outlinks under the fetch time
+        for (String outlinkUrl : outlinkMap.keySet()) {
+          String anchor = outlinkMap.get(outlinkUrl);
+          LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
+          output.collect(key, new NutchWritable(datum));
+        }
+      } else if (value instanceof LinkDatum) {
+        LinkDatum datum = (LinkDatum) value;
+        String linkDatumUrl = normalizeUrl(datum.getUrl());
+
+        if (filterUrl(linkDatumUrl) != null) {
+          datum.setUrl(linkDatumUrl);
+
+          // collect existing outlinks from existing OutlinkDb
+          output.collect(key, new NutchWritable(datum));
+        }
+      }
+    }
+
+    public void reduce(Text key, Iterator<NutchWritable> values,
+        OutputCollector<Text, LinkDatum> output, Reporter reporter)
+        throws IOException {
+
+      // aggregate all outlinks, get the most recent timestamp for a fetch
+      // which should be the timestamp for all of the most recent outlinks
+      long mostRecent = 0L;
+      List<LinkDatum> outlinkList = new ArrayList<LinkDatum>();
+      while (values.hasNext()) {
+        Writable value = values.next().get();
+
+        if (value instanceof LinkDatum) {
+          // loop through, change out most recent timestamp if needed
+          LinkDatum next = (LinkDatum) value;
+          long timestamp = next.getTimestamp();
+          if (mostRecent == 0L || mostRecent < timestamp) {
+            mostRecent = timestamp;
+          }
+          outlinkList.add(WritableUtils.clone(next, conf));
+          reporter.incrCounter("WebGraph.outlinks", "added links", 1);
+        } else if (value instanceof BooleanWritable) {
+          BooleanWritable delete = (BooleanWritable) value;
+          // Actually, delete is always true, otherwise we don't emit it in the
+          // mapper in the first place
+          if (delete.get() == true) {
+            // This page is gone, do not emit it's outlinks
+            reporter.incrCounter("WebGraph.outlinks", "removed links", 1);
+            return;
+          }
+        }
+      }
+
+      // get the url, domain, and host for the url
+      String url = key.toString();
+      String domain = URLUtil.getDomainName(url);
+      String host = URLUtil.getHost(url);
+
+      // setup checking sets for domains and pages
+      Set<String> domains = new HashSet<String>();
+      Set<String> pages = new HashSet<String>();
+
+      // loop through the link datums
+      for (LinkDatum datum : outlinkList) {
+
+        // get the url, host, domain, and page for each outlink
+        String toUrl = datum.getUrl();
+        String toDomain = URLUtil.getDomainName(toUrl);
+        String toHost = URLUtil.getHost(toUrl);
+        String toPage = URLUtil.getPage(toUrl);
+        datum.setLinkType(LinkDatum.OUTLINK);
+
+        // outlinks must be the most recent and conform to internal url and
+        // limiting rules, if it does collect it
+        if (datum.getTimestamp() == mostRecent
+            && (!limitPages || (limitPages && !pages.contains(toPage)))
+            && (!limitDomains || (limitDomains && !domains.contains(toDomain)))
+            && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
+            && (!ignoreDomain || (ignoreDomain && !toDomain
+                .equalsIgnoreCase(domain)))) {
+          output.collect(key, datum);
+          pages.add(toPage);
+          domains.add(toDomain);
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * The InlinkDb creates a database of Inlinks. Inlinks are inverted from the
+   * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is
+   * updated.
+   */
+  private static class InlinkDb extends Configured implements
+      Mapper<Text, LinkDatum, Text, LinkDatum> {
+
+    private long timestamp;
+
+    /**
+     * Configures job. Sets timestamp for all Inlink LinkDatum objects to the
+     * current system time.
+     */
+    public void configure(JobConf conf) {
+      timestamp = System.currentTimeMillis();
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
+     * new system timestamp, type and to and from url switched.
+     */
+    public void map(Text key, LinkDatum datum,
+        OutputCollector<Text, LinkDatum> output, Reporter reporter)
+        throws IOException {
+
+      // get the to and from url and the anchor
+      String fromUrl = key.toString();
+      String toUrl = datum.getUrl();
+      String anchor = datum.getAnchor();
+
+      // flip the from and to url and set the new link type
+      LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp);
+      inlink.setLinkType(LinkDatum.INLINK);
+      output.collect(new Text(toUrl), inlink);
+    }
+  }
+
+  /**
+   * Creates the Node database which consists of the number of in and outlinks
+   * for each url and a score slot for analysis programs such as LinkRank.
+   */
+  private static class NodeDb extends Configured implements
+      Reducer<Text, LinkDatum, Text, Node> {
+
+    /**
+     * Configures job.
+     */
+    public void configure(JobConf conf) {
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Counts the number of inlinks and outlinks for each url and sets a default
+     * score of 0.0 for each url (node) in the webgraph.
+     */
+    public void reduce(Text key, Iterator<LinkDatum> values,
+        OutputCollector<Text, Node> output, Reporter reporter)
+        throws IOException {
+
+      Node node = new Node();
+      int numInlinks = 0;
+      int numOutlinks = 0;
+
+      // loop through counting number of in and out links
+      while (values.hasNext()) {
+        LinkDatum next = values.next();
+        if (next.getLinkType() == LinkDatum.INLINK) {
+          numInlinks++;
+        } else if (next.getLinkType() == LinkDatum.OUTLINK) {
+          numOutlinks++;
+        }
+      }
+
+      // set the in and outlinks and a default score of 0
+      node.setNumInlinks(numInlinks);
+      node.setNumOutlinks(numOutlinks);
+      node.setInlinkScore(0.0f);
+      output.collect(key, node);
+    }
+  }
+
+  /**
+   * Creates the three different WebGraph databases, Outlinks, Inlinks, and
+   * Node. If a current WebGraph exists then it is updated, if it doesn't exist
+   * then a new WebGraph database is created.
+   * 
+   * @param webGraphDb
+   *          The WebGraph to create or update.
+   * @param segments
+   *          The array of segments used to update the WebGraph. Newer segments
+   *          and fetch times will overwrite older segments.
+   * @param normalize
+   *          whether to use URLNormalizers on URL's in the segment
+   * @param filter
+   *          whether to use URLFilters on URL's in the segment
+   * 
+   * @throws IOException
+   *           If an error occurs while processing the WebGraph.
+   */
+  public void createWebGraph(Path webGraphDb, Path[] segments,
+      boolean normalize, boolean filter) throws IOException {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("WebGraphDb: starting at " + sdf.format(start));
+      LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
+      LOG.info("WebGraphDb: URL normalize: " + normalize);
+      LOG.info("WebGraphDb: URL filter: " + filter);
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // lock an existing webgraphdb to prevent multiple simultaneous updates
+    Path lock = new Path(webGraphDb, LOCK_NAME);
+    if (!fs.exists(webGraphDb)) {
+      fs.mkdirs(webGraphDb);
+    }
+
+    LockUtil.createLockFile(fs, lock, false);
+
+    // outlink and temp outlink database paths
+    Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
+    Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR);
+
+    if (!fs.exists(outlinkDb)) {
+      fs.mkdirs(outlinkDb);
+    }
+
+    Path tempOutlinkDb = new Path(outlinkDb + "-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    JobConf outlinkJob = new NutchJob(conf);
+    outlinkJob.setJobName("Outlinkdb: " + outlinkDb);
+
+    boolean deleteGone = conf.getBoolean("link.delete.gone", false);
+    boolean preserveBackup = conf.getBoolean("db.preserve.backup", true);
+
+    if (deleteGone) {
+      LOG.info("OutlinkDb: deleting gone links");
+    }
+
+    // get the parse data and crawl fetch data for all segments
+    if (segments != null) {
+      for (int i = 0; i < segments.length; i++) {
+        Path parseData = new Path(segments[i], ParseData.DIR_NAME);
+        if (fs.exists(parseData)) {
+          LOG.info("OutlinkDb: adding input: " + parseData);
+          FileInputFormat.addInputPath(outlinkJob, parseData);
+        }
+
+        if (deleteGone) {
+          Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
+          if (fs.exists(crawlFetch)) {
+            LOG.info("OutlinkDb: adding input: " + crawlFetch);
+            FileInputFormat.addInputPath(outlinkJob, crawlFetch);
+          }
+        }
+      }
+    }
+
+    // add the existing webgraph
+    LOG.info("OutlinkDb: adding input: " + outlinkDb);
+    FileInputFormat.addInputPath(outlinkJob, outlinkDb);
+
+    outlinkJob.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
+    outlinkJob.setBoolean(OutlinkDb.URL_FILTERING, filter);
+
+    outlinkJob.setInputFormat(SequenceFileInputFormat.class);
+    outlinkJob.setMapperClass(OutlinkDb.class);
+    outlinkJob.setReducerClass(OutlinkDb.class);
+    outlinkJob.setMapOutputKeyClass(Text.class);
+    outlinkJob.setMapOutputValueClass(NutchWritable.class);
+    outlinkJob.setOutputKeyClass(Text.class);
+    outlinkJob.setOutputValueClass(LinkDatum.class);
+    FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
+    outlinkJob.setOutputFormat(MapFileOutputFormat.class);
+    outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    // run the outlinkdb job and replace any old outlinkdb with the new one
+    try {
+      LOG.info("OutlinkDb: running");
+      JobClient.runJob(outlinkJob);
+      LOG.info("OutlinkDb: installing " + outlinkDb);
+      FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
+      FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
+      if (!preserveBackup && fs.exists(oldOutlinkDb))
+        fs.delete(oldOutlinkDb, true);
+      LOG.info("OutlinkDb: finished");
+    } catch (IOException e) {
+
+      // remove lock file and and temporary directory if an error occurs
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(tempOutlinkDb)) {
+        fs.delete(tempOutlinkDb, true);
+      }
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // inlink and temp link database paths
+    Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
+    Path tempInlinkDb = new Path(inlinkDb + "-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf inlinkJob = new NutchJob(conf);
+    inlinkJob.setJobName("Inlinkdb " + inlinkDb);
+    LOG.info("InlinkDb: adding input: " + outlinkDb);
+    FileInputFormat.addInputPath(inlinkJob, outlinkDb);
+    inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+    inlinkJob.setMapperClass(InlinkDb.class);
+    inlinkJob.setMapOutputKeyClass(Text.class);
+    inlinkJob.setMapOutputValueClass(LinkDatum.class);
+    inlinkJob.setOutputKeyClass(Text.class);
+    inlinkJob.setOutputValueClass(LinkDatum.class);
+    FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
+    inlinkJob.setOutputFormat(MapFileOutputFormat.class);
+    inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    try {
+
+      // run the inlink and replace any old with new
+      LOG.info("InlinkDb: running");
+      JobClient.runJob(inlinkJob);
+      LOG.info("InlinkDb: installing " + inlinkDb);
+      FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
+      LOG.info("InlinkDb: finished");
+    } catch (IOException e) {
+
+      // remove lock file and and temporary directory if an error occurs
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(tempInlinkDb)) {
+        fs.delete(tempInlinkDb, true);
+      }
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // node and temp node database paths
+    Path nodeDb = new Path(webGraphDb, NODE_DIR);
+    Path tempNodeDb = new Path(nodeDb + "-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    JobConf nodeJob = new NutchJob(conf);
+    nodeJob.setJobName("NodeDb " + nodeDb);
+    LOG.info("NodeDb: adding input: " + outlinkDb);
+    LOG.info("NodeDb: adding input: " + inlinkDb);
+    FileInputFormat.addInputPath(nodeJob, outlinkDb);
+    FileInputFormat.addInputPath(nodeJob, inlinkDb);
+    nodeJob.setInputFormat(SequenceFileInputFormat.class);
+    nodeJob.setReducerClass(NodeDb.class);
+    nodeJob.setMapOutputKeyClass(Text.class);
+    nodeJob.setMapOutputValueClass(LinkDatum.class);
+    nodeJob.setOutputKeyClass(Text.class);
+    nodeJob.setOutputValueClass(Node.class);
+    FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
+    nodeJob.setOutputFormat(MapFileOutputFormat.class);
+    nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    try {
+
+      // run the node job and replace old nodedb with new
+      LOG.info("NodeDb: running");
+      JobClient.runJob(nodeJob);
+      LOG.info("NodeDb: installing " + nodeDb);
+      FSUtils.replace(fs, nodeDb, tempNodeDb, true);
+      LOG.info("NodeDb: finished");
+    } catch (IOException e) {
+
+      // remove lock file and and temporary directory if an error occurs
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(tempNodeDb)) {
+        fs.delete(tempNodeDb, true);
+      }
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // remove the lock file for the webgraph
+    LockUtil.removeLockFile(fs, lock);
+
+    long end = System.currentTimeMillis();
+    LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Parses command link arguments and runs the WebGraph jobs.
+   */
+  public int run(String[] args) throws Exception {
+
+    // boolean options
+    Option helpOpt = new Option("h", "help", false, "show this help message");
+    Option normOpt = new Option("n", "normalize", false,
+        "whether to use URLNormalizers on the URL's in the segment");
+    Option filtOpt = new Option("f", "filter", false,
+        "whether to use URLFilters on the URL's in the segment");
+
+    // argument options
+    @SuppressWarnings("static-access")
+    Option graphOpt = OptionBuilder
+        .withArgName("webgraphdb")
+        .hasArg()
+        .withDescription(
+            "the web graph database to create (if none exists) or use if one does")
+        .create("webgraphdb");
+    @SuppressWarnings("static-access")
+    Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+        .withDescription("the segment(s) to use").create("segment");
+    @SuppressWarnings("static-access")
+    Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs()
+        .withDescription("the segment directory to use").create("segmentDir");
+
+    // create the options
+    Options options = new Options();
+    options.addOption(helpOpt);
+    options.addOption(normOpt);
+    options.addOption(filtOpt);
+    options.addOption(graphOpt);
+    options.addOption(segOpt);
+    options.addOption(segDirOpt);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")
+          || (!line.hasOption("segment") && !line.hasOption("segmentDir"))) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("WebGraph", options, true);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+
+      Path[] segPaths = null;
+
+      // Handle segment option
+      if (line.hasOption("segment")) {
+        String[] segments = line.getOptionValues("segment");
+        segPaths = new Path[segments.length];
+        for (int i = 0; i < segments.length; i++) {
+          segPaths[i] = new Path(segments[i]);
+        }
+      }
+
+      // Handle segmentDir option
+      if (line.hasOption("segmentDir")) {
+        Path dir = new Path(line.getOptionValue("segmentDir"));
+        FileSystem fs = dir.getFileSystem(getConf());
+        FileStatus[] fstats = fs.listStatus(dir,
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
+        segPaths = HadoopFSUtil.getPaths(fstats);
+      }
+
+      boolean normalize = false;
+
+      if (line.hasOption("normalize")) {
+        normalize = true;
+      }
+
+      boolean filter = false;
+
+      if (line.hasOption("filter")) {
+        filter = true;
+      }
+
+      createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("WebGraph: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java
new file mode 100644
index 0000000..a568b46
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Scoring implementation based on link analysis
+ * ({@link org.apache.nutch.scoring.webgraph.LinkRank}),
+ * see {@link org.apache.nutch.scoring.webgraph.WebGraph}.
+ */
+package org.apache.nutch.scoring.webgraph;
+

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java b/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
new file mode 100644
index 0000000..d67b590
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * An input format that takes Nutch Content objects and converts them to text
+ * while converting newline endings to spaces. This format is useful for working
+ * with Nutch content objects in Hadoop Streaming with other languages.
+ */
+public class ContentAsTextInputFormat extends
+    SequenceFileInputFormat<Text, Text> {
+
+  private static class ContentAsTextRecordReader implements
+      RecordReader<Text, Text> {
+
+    private final SequenceFileRecordReader<Text, Content> sequenceFileRecordReader;
+
+    private Text innerKey;
+    private Content innerValue;
+
+    public ContentAsTextRecordReader(Configuration conf, FileSplit split)
+        throws IOException {
+      sequenceFileRecordReader = new SequenceFileRecordReader<Text, Content>(
+          conf, split);
+      innerKey = sequenceFileRecordReader.createKey();
+      innerValue = sequenceFileRecordReader.createValue();
+    }
+
+    public Text createKey() {
+      return new Text();
+    }
+
+    public Text createValue() {
+      return new Text();
+    }
+
+    public synchronized boolean next(Text key, Text value) throws IOException {
+
+      // convert the content object to text
+      Text tKey = key;
+      if (!sequenceFileRecordReader.next(innerKey, innerValue)) {
+        return false;
+      }
+      tKey.set(innerKey.toString());
+      String contentAsStr = new String(innerValue.getContent());
+
+      // replace new line endings with spaces
+      contentAsStr = contentAsStr.replaceAll("\n", " ");
+      value.set(contentAsStr);
+
+      return true;
+    }
+
+    public float getProgress() throws IOException {
+      return sequenceFileRecordReader.getProgress();
+    }
+
+    public synchronized long getPos() throws IOException {
+      return sequenceFileRecordReader.getPos();
+    }
+
+    public synchronized void close() throws IOException {
+      sequenceFileRecordReader.close();
+    }
+  }
+
+  public ContentAsTextInputFormat() {
+    super();
+  }
+
+  public RecordReader<Text, Text> getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+    return new ContentAsTextRecordReader(job, (FileSplit) split);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java
new file mode 100644
index 0000000..ec601f4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks whether a segment is valid, or has a certain status (generated,
+ * fetched, parsed), or can be used safely for a certain processing step
+ * (e.g., indexing).
+ */
+public class SegmentChecker {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(SegmentChecker.class);
+
+  /**
+   * Check if the segment is indexable. May add new check methods here.
+   */
+  public static boolean isIndexable(Path segmentPath, FileSystem fs)
+      throws IOException {
+    if (segmentPath == null || fs == null) {
+      LOG.info("No segment path or filesystem set.");
+      return false;
+    }
+
+    boolean checkResult = true;
+    checkResult &= checkSegmentDir(segmentPath, fs);
+    // Add new check methods here
+
+    if (checkResult) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check the segment to see if it is valid based on the sub directories.
+   */
+  public static boolean checkSegmentDir(Path segmentPath, FileSystem fs)
+      throws IOException {
+
+    if (segmentPath.getName().length() != 14) {
+      LOG.warn("The input path at {} is not a segment... skipping", segmentPath.getName());
+      return false;
+    }
+    
+    FileStatus[] fstats_segment = fs.listStatus(segmentPath,
+        HadoopFSUtil.getPassDirectoriesFilter(fs));
+    Path[] segment_files = HadoopFSUtil.getPaths(fstats_segment);
+
+    boolean crawlFetchExists = false;
+    boolean crawlParseExists = false;
+    boolean parseDataExists = false;
+    boolean parseTextExists = false;
+
+    for (Path path : segment_files) {
+      String pathName = path.getName();
+      crawlFetchExists |= pathName.equals(CrawlDatum.FETCH_DIR_NAME);
+      crawlParseExists |= pathName.equals(CrawlDatum.PARSE_DIR_NAME);
+      parseDataExists |= pathName.equals(ParseData.DIR_NAME);
+      parseTextExists |= pathName.equals(ParseText.DIR_NAME);
+    }
+
+    if (parseTextExists && crawlParseExists && crawlFetchExists
+        && parseDataExists) {
+
+      // No segment dir missing
+      LOG.info("Segment dir is complete: " + segmentPath.toString() + ".");
+
+      return true;
+    } else {
+
+      // log the missing dir
+      StringBuilder missingDir = new StringBuilder("");
+      if (parseDataExists == false) {
+        missingDir.append(ParseData.DIR_NAME + ", ");
+      }
+      if (parseTextExists == false) {
+        missingDir.append(ParseText.DIR_NAME + ", ");
+      }
+      if (crawlParseExists == false) {
+        missingDir.append(CrawlDatum.PARSE_DIR_NAME + ", ");
+      }
+      if (crawlFetchExists == false) {
+        missingDir.append(CrawlDatum.FETCH_DIR_NAME + ", ");
+      }
+
+      String missingDirString = missingDir.toString();
+      LOG.warn("Skipping segment: " + segmentPath.toString()
+          + ". Missing sub directories: "
+          + missingDirString.substring(0, missingDirString.length() - 2));
+
+      return false;
+    }
+
+  }
+
+  /**
+   * Check the segment to see if it is has been parsed before.
+   */
+  public static boolean isParsed(Path segment, FileSystem fs)
+      throws IOException {
+
+      if (fs.exists(new Path(segment, CrawlDatum.PARSE_DIR_NAME))){
+	return true;
+      }
+      return false;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java
new file mode 100644
index 0000000..6d53809
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Interface used to filter segments during segment merge. It allows filtering
+ * on more sophisticated criteria than just URLs. In particular it allows
+ * filtering based on metadata collected while parsing page.
+ * 
+ */
+public interface SegmentMergeFilter {
+  /** The name of the extension point. */
+  public final static String X_POINT_ID = SegmentMergeFilter.class.getName();
+
+  /**
+   * The filtering method which gets all information being merged for a given
+   * key (URL).
+   * 
+   * @return <tt>true</tt> values for this <tt>key</tt> (URL) should be merged
+   *         into the new segment.
+   */
+  public boolean filter(Text key, CrawlDatum generateData,
+      CrawlDatum fetchData, CrawlDatum sigData, Content content,
+      ParseData parseData, ParseText parseText, Collection<CrawlDatum> linked);
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java
new file mode 100644
index 0000000..7aa2de3
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.net.URLFilter;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * This class wraps all {@link SegmentMergeFilter} extensions in a single object
+ * so it is easier to operate on them. If any of extensions returns
+ * <tt>false</tt> this one will return <tt>false</tt> as well.
+ * 
+ */
+public class SegmentMergeFilters {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SegmentMergeFilters.class);
+  private SegmentMergeFilter[] filters;
+
+  public SegmentMergeFilters(Configuration conf) {
+    try {
+      ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+          SegmentMergeFilter.X_POINT_ID);
+      if (point == null)
+        throw new RuntimeException(URLFilter.X_POINT_ID + " not found.");
+      Extension[] extensions = point.getExtensions();
+      filters = new SegmentMergeFilter[extensions.length];
+      for (int i = 0; i < extensions.length; i++) {
+        filters[i] = (SegmentMergeFilter) extensions[i].getExtensionInstance();
+      }
+    } catch (PluginRuntimeException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Iterates over all {@link SegmentMergeFilter} extensions and if any of them
+   * returns false, it will return false as well.
+   * 
+   * @return <tt>true</tt> values for this <tt>key</tt> (URL) should be merged
+   *         into the new segment.
+   */
+  public boolean filter(Text key, CrawlDatum generateData,
+      CrawlDatum fetchData, CrawlDatum sigData, Content content,
+      ParseData parseData, ParseText parseText, Collection<CrawlDatum> linked) {
+    for (SegmentMergeFilter filter : filters) {
+      if (!filter.filter(key, generateData, fetchData, sigData, content,
+          parseData, parseText, linked)) {
+        if (LOG.isTraceEnabled())
+          LOG.trace("Key " + key + " dropped by " + filter.getClass().getName());
+        return false;
+      }
+    }
+    if (LOG.isTraceEnabled())
+      LOG.trace("Key " + key + " accepted for merge.");
+    return true;
+  }
+}