You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:56 UTC

[40/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java b/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java
new file mode 100644
index 0000000..9e75531
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * @author Andrzej Bialecki
+ */
+public class ProtocolStatus implements Writable {
+
+  private final static byte VERSION = 2;
+
+  /** Content was retrieved without errors. */
+  public static final int SUCCESS = 1;
+  /** Content was not retrieved. Any further errors may be indicated in args. */
+  public static final int FAILED = 2;
+
+  /** This protocol was not found. Application may attempt to retry later. */
+  public static final int PROTO_NOT_FOUND = 10;
+  /** Resource is gone. */
+  public static final int GONE = 11;
+  /** Resource has moved permanently. New url should be found in args. */
+  public static final int MOVED = 12;
+  /** Resource has moved temporarily. New url should be found in args. */
+  public static final int TEMP_MOVED = 13;
+  /** Resource was not found. */
+  public static final int NOTFOUND = 14;
+  /** Temporary failure. Application may retry immediately. */
+  public static final int RETRY = 15;
+  /**
+   * Unspecified exception occured. Further information may be provided in args.
+   */
+  public static final int EXCEPTION = 16;
+  /** Access denied - authorization required, but missing/incorrect. */
+  public static final int ACCESS_DENIED = 17;
+  /** Access denied by robots.txt rules. */
+  public static final int ROBOTS_DENIED = 18;
+  /** Too many redirects. */
+  public static final int REDIR_EXCEEDED = 19;
+  /** Not fetching. */
+  public static final int NOTFETCHING = 20;
+  /** Unchanged since the last fetch. */
+  public static final int NOTMODIFIED = 21;
+  /**
+   * Request was refused by protocol plugins, because it would block. The
+   * expected number of milliseconds to wait before retry may be provided in
+   * args.
+   */
+  public static final int WOULDBLOCK = 22;
+  /** Thread was blocked http.max.delays times during fetching. */
+  public static final int BLOCKED = 23;
+
+  // Useful static instances for status codes that don't usually require any
+  // additional arguments.
+  public static final ProtocolStatus STATUS_SUCCESS = new ProtocolStatus(
+      SUCCESS);
+  public static final ProtocolStatus STATUS_FAILED = new ProtocolStatus(FAILED);
+  public static final ProtocolStatus STATUS_GONE = new ProtocolStatus(GONE);
+  public static final ProtocolStatus STATUS_NOTFOUND = new ProtocolStatus(
+      NOTFOUND);
+  public static final ProtocolStatus STATUS_RETRY = new ProtocolStatus(RETRY);
+  public static final ProtocolStatus STATUS_ROBOTS_DENIED = new ProtocolStatus(
+      ROBOTS_DENIED);
+  public static final ProtocolStatus STATUS_REDIR_EXCEEDED = new ProtocolStatus(
+      REDIR_EXCEEDED);
+  public static final ProtocolStatus STATUS_NOTFETCHING = new ProtocolStatus(
+      NOTFETCHING);
+  public static final ProtocolStatus STATUS_NOTMODIFIED = new ProtocolStatus(
+      NOTMODIFIED);
+  public static final ProtocolStatus STATUS_WOULDBLOCK = new ProtocolStatus(
+      WOULDBLOCK);
+  public static final ProtocolStatus STATUS_BLOCKED = new ProtocolStatus(
+      BLOCKED);
+
+  private int code;
+  private long lastModified;
+  private String[] args;
+
+  private static final HashMap<Integer, String> codeToName = new HashMap<Integer, String>();
+  static {
+    codeToName.put(new Integer(SUCCESS), "success");
+    codeToName.put(new Integer(FAILED), "failed");
+    codeToName.put(new Integer(PROTO_NOT_FOUND), "proto_not_found");
+    codeToName.put(new Integer(GONE), "gone");
+    codeToName.put(new Integer(MOVED), "moved");
+    codeToName.put(new Integer(TEMP_MOVED), "temp_moved");
+    codeToName.put(new Integer(NOTFOUND), "notfound");
+    codeToName.put(new Integer(RETRY), "retry");
+    codeToName.put(new Integer(EXCEPTION), "exception");
+    codeToName.put(new Integer(ACCESS_DENIED), "access_denied");
+    codeToName.put(new Integer(ROBOTS_DENIED), "robots_denied");
+    codeToName.put(new Integer(REDIR_EXCEEDED), "redir_exceeded");
+    codeToName.put(new Integer(NOTFETCHING), "notfetching");
+    codeToName.put(new Integer(NOTMODIFIED), "notmodified");
+    codeToName.put(new Integer(WOULDBLOCK), "wouldblock");
+    codeToName.put(new Integer(BLOCKED), "blocked");
+  }
+
+  public ProtocolStatus() {
+
+  }
+
+  public ProtocolStatus(int code, String[] args) {
+    this.code = code;
+    this.args = args;
+  }
+
+  public ProtocolStatus(int code, String[] args, long lastModified) {
+    this.code = code;
+    this.args = args;
+    this.lastModified = lastModified;
+  }
+
+  public ProtocolStatus(int code) {
+    this(code, null);
+  }
+
+  public ProtocolStatus(int code, long lastModified) {
+    this(code, null, lastModified);
+  }
+
+  public ProtocolStatus(int code, Object message) {
+    this(code, message, 0L);
+  }
+
+  public ProtocolStatus(int code, Object message, long lastModified) {
+    this.code = code;
+    this.lastModified = lastModified;
+    if (message != null)
+      this.args = new String[] { String.valueOf(message) };
+  }
+
+  public ProtocolStatus(Throwable t) {
+    this(EXCEPTION, t);
+  }
+
+  public static ProtocolStatus read(DataInput in) throws IOException {
+    ProtocolStatus res = new ProtocolStatus();
+    res.readFields(in);
+    return res;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    byte version = in.readByte();
+    switch (version) {
+    case 1:
+      code = in.readByte();
+      lastModified = in.readLong();
+      args = WritableUtils.readCompressedStringArray(in);
+      break;
+    case VERSION:
+      code = in.readByte();
+      lastModified = in.readLong();
+      args = WritableUtils.readStringArray(in);
+      break;
+    default:
+      throw new VersionMismatchException(VERSION, version);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeByte(VERSION);
+    out.writeByte((byte) code);
+    out.writeLong(lastModified);
+    if (args == null) {
+      out.writeInt(-1);
+    } else {
+      WritableUtils.writeStringArray(out, args);
+    }
+  }
+
+  public void setArgs(String[] args) {
+    this.args = args;
+  }
+
+  public String[] getArgs() {
+    return args;
+  }
+
+  public int getCode() {
+    return code;
+  }
+
+  public String getName() {
+    return codeToName.get(this.code);
+  }
+
+  public void setCode(int code) {
+    this.code = code;
+  }
+
+  public boolean isSuccess() {
+    return code == SUCCESS;
+  }
+
+  public boolean isTransientFailure() {
+    return code == ACCESS_DENIED || code == EXCEPTION || code == REDIR_EXCEEDED
+        || code == RETRY || code == TEMP_MOVED || code == WOULDBLOCK
+        || code == PROTO_NOT_FOUND;
+  }
+
+  public boolean isPermanentFailure() {
+    return code == FAILED || code == GONE || code == MOVED || code == NOTFOUND
+        || code == ROBOTS_DENIED;
+  }
+
+  public boolean isRedirect() {
+      return code == MOVED || code == TEMP_MOVED;
+  }
+
+  public String getMessage() {
+    if (args != null && args.length > 0)
+      return args[0];
+    return null;
+  }
+
+  public void setMessage(String msg) {
+    if (args != null && args.length > 0)
+      args[0] = msg;
+    else
+      args = new String[] { msg };
+  }
+
+  public long getLastModified() {
+    return lastModified;
+  }
+
+  public void setLastModified(long lastModified) {
+    this.lastModified = lastModified;
+  }
+
+  public boolean equals(Object o) {
+    if (o == null)
+      return false;
+    if (!(o instanceof ProtocolStatus))
+      return false;
+    ProtocolStatus other = (ProtocolStatus) o;
+    if (this.code != other.code || this.lastModified != other.lastModified)
+      return false;
+    if (this.args == null) {
+      if (other.args == null)
+        return true;
+      else
+        return false;
+    } else {
+      if (other.args == null)
+        return false;
+      if (other.args.length != this.args.length)
+        return false;
+      for (int i = 0; i < this.args.length; i++) {
+        if (!this.args[i].equals(other.args[i]))
+          return false;
+      }
+    }
+    return true;
+  }
+
+  public String toString() {
+    StringBuffer res = new StringBuffer();
+    res.append(codeToName.get(new Integer(code)) + "(" + code
+        + "), lastModified=" + lastModified);
+    if (args != null) {
+      if (args.length == 1) {
+        res.append(": " + String.valueOf(args[0]));
+      } else {
+        for (int i = 0; i < args.length; i++) {
+          if (args[i] != null)
+            res.append(", args[" + i + "]=" + String.valueOf(args[i]));
+        }
+      }
+    }
+    return res.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java b/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java
new file mode 100644
index 0000000..475aef4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.protocol;
+
+// JDK imports
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.LineNumberReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Nutch imports
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.SuffixStringMatcher;
+
+import crawlercommons.robots.BaseRobotRules;
+import crawlercommons.robots.SimpleRobotRules;
+import crawlercommons.robots.SimpleRobotRules.RobotRulesMode;
+import crawlercommons.robots.SimpleRobotRulesParser;
+
+/**
+ * This class uses crawler-commons for handling the parsing of
+ * {@code robots.txt} files. It emits SimpleRobotRules objects, which describe
+ * the download permissions as described in SimpleRobotRulesParser.
+ * 
+ * Protocol-specific implementations have to implement the method
+ * {@link getRobotRulesSet}.
+ */
+public abstract class RobotRulesParser implements Tool {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(RobotRulesParser.class);
+
+  protected static final Hashtable<String, BaseRobotRules> CACHE = new Hashtable<String, BaseRobotRules>();
+  
+  /**
+   * A {@link BaseRobotRules} object appropriate for use when the
+   * {@code robots.txt} file is empty or missing; all requests are allowed.
+   */
+  public static final BaseRobotRules EMPTY_RULES = new SimpleRobotRules(
+      RobotRulesMode.ALLOW_ALL);
+
+  /**
+   * A {@link BaseRobotRules} object appropriate for use when the
+   * {@code robots.txt} file is not fetched due to a {@code 403/Forbidden}
+   * response; all requests are disallowed.
+   */
+  public static BaseRobotRules FORBID_ALL_RULES = new SimpleRobotRules(
+      RobotRulesMode.ALLOW_NONE);
+
+  private static SimpleRobotRulesParser robotParser = new SimpleRobotRulesParser();
+  protected Configuration conf;
+  protected String agentNames;
+
+  /** set of host names or IPs to be explicitly excluded from robots.txt checking */
+  protected Set<String> whiteList = new HashSet<String>();
+  
+  /* Matcher user for efficiently matching URLs against a set of suffixes. */
+  private SuffixStringMatcher matcher = null;
+
+  public RobotRulesParser() {
+  }
+
+  public RobotRulesParser(Configuration conf) {
+    setConf(conf);
+  }
+
+  /**
+   * Set the {@link Configuration} object
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+
+    // Grab the agent names we advertise to robots files.
+    String agentName = conf.get("http.agent.name");
+    if (agentName == null || (agentName = agentName.trim()).isEmpty()) {
+      throw new RuntimeException("Agent name not configured!");
+    }
+    agentNames = agentName;
+
+    // If there are any other agents specified, append those to the list of
+    // agents
+    String otherAgents = conf.get("http.robots.agents");
+    if (otherAgents != null && !otherAgents.trim().isEmpty()) {
+      StringTokenizer tok = new StringTokenizer(otherAgents, ",");
+      StringBuilder sb = new StringBuilder(agentNames);
+      while (tok.hasMoreTokens()) {
+        String str = tok.nextToken().trim();
+        if (str.equals("*") || str.equals(agentName)) {
+          // skip wildcard "*" or agent name itself
+          // (required for backward compatibility, cf. NUTCH-1715 and
+          // NUTCH-1718)
+        } else {
+          sb.append(",").append(str);
+        }
+      }
+
+      agentNames = sb.toString();
+    }
+
+    String[] confWhiteList = conf.getStrings("http.robot.rules.whitelist");
+    if (confWhiteList == null) {
+      LOG.info("robots.txt whitelist not configured.");
+    }
+    else {
+      for (int i = 0; i < confWhiteList.length; i++) {
+        if (confWhiteList[i].isEmpty()) {
+      	  LOG.info("Empty whitelisted URL skipped!");
+      	  continue;
+        }
+        whiteList.add(confWhiteList[i]);
+      }
+      
+      if (whiteList.size() > 0) {
+        matcher = new SuffixStringMatcher(whiteList);
+        LOG.info("Whitelisted hosts: " + whiteList);
+      }
+    }
+  }
+
+  /**
+   * Get the {@link Configuration} object
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * Check whether a URL belongs to a whitelisted host.
+   */
+  public boolean isWhiteListed(URL url) {
+    boolean match = false;
+    String urlString = url.getHost();
+    
+    if (matcher != null) {
+    	match = matcher.matches(urlString);
+    }
+    
+    return match;
+  }
+
+  /**
+   * Parses the robots content using the {@link SimpleRobotRulesParser} from
+   * crawler commons
+   * 
+   * @param url
+   *          A string containing url
+   * @param content
+   *          Contents of the robots file in a byte array
+   * @param contentType
+   *          The content type of the robots file
+   * @param robotName
+   *          A string containing all the robots agent names used by parser for
+   *          matching
+   * @return BaseRobotRules object
+   */
+  public BaseRobotRules parseRules(String url, byte[] content,
+      String contentType, String robotName) {
+    return robotParser.parseContent(url, content, contentType, robotName);
+  }
+
+  public BaseRobotRules getRobotRulesSet(Protocol protocol, Text url) {
+    URL u = null;
+    try {
+      u = new URL(url.toString());
+    } catch (Exception e) {
+      return EMPTY_RULES;
+    }
+    return getRobotRulesSet(protocol, u);
+  }
+
+  /**
+   * Fetch robots.txt (or it's protocol-specific equivalent) which applies to
+   * the given URL, parse it and return the set of robot rules applicable for
+   * the configured agent name(s).
+   * 
+   * @param protocol
+   *          protocol implementation
+   * @param url
+   *          URL to be checked whether fetching is allowed by robot rules
+   * @return robot rules
+   */
+  public abstract BaseRobotRules getRobotRulesSet(Protocol protocol, URL url);
+
+  @Override
+  public int run(String[] args) {
+
+    if (args.length < 2) {
+      String[] help = {
+          "Usage: RobotRulesParser <robots-file> <url-file> [<agent-names>]\n",
+          "\tThe <robots-file> will be parsed as a robots.txt file,",
+          "\tusing the given <agent-name> to select rules.",
+          "\tURLs will be read (one per line) from <url-file>,",
+          "\tand tested against the rules.",
+          "\tMultiple agent names can be provided using",
+          "\tcomma as a delimiter without any spaces.",
+          "\tIf no agent name is given the property http.agent.name",
+          "\tis used. If http.agent.name is empty, robots.txt is checked",
+          "\tfor rules assigned to the user agent `*' (meaning any other)." };
+      for (String s : help) {
+        System.err.println(s);
+      }
+      System.exit(-1);
+    }
+
+    File robotsFile = new File(args[0]);
+    File urlFile = new File(args[1]);
+
+    if (args.length > 2) {
+      // set agent name from command-line in configuration and update parser
+      String agents = args[2];
+      conf.set("http.agent.name", agents);
+      setConf(conf);
+    }
+
+    try {
+      BaseRobotRules rules = getRobotRulesSet(null, robotsFile.toURI().toURL());
+
+      LineNumberReader testsIn = new LineNumberReader(new FileReader(urlFile));
+      String testPath;
+      testPath = testsIn.readLine().trim();
+      while (testPath != null) {
+        try {
+          // testPath can be just a path or a complete URL
+          URL url = new URL(testPath);
+          String status;
+          if (isWhiteListed(url)) {
+            status = "whitelisted";
+          } else if (rules.isAllowed(testPath)) {
+            status = "allowed";
+          } else {
+            status = "not allowed";
+          }
+          System.out.println(status + ":\t" + testPath);
+        } catch (MalformedURLException e) {
+        }
+        testPath = testsIn.readLine();
+      }
+      testsIn.close();
+    } catch (IOException e) {
+      LOG.error("Failed to run: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+
+    return 0;
+  }
+
+  /**
+   * {@link RobotRulesParser} implementation which expects the location of the
+   * robots.txt passed by URL (usually pointing to a local file) in
+   * {@link getRobotRulesSet}.
+   */
+  private static class TestRobotRulesParser extends RobotRulesParser {
+
+    public TestRobotRulesParser(Configuration conf) {
+      // make sure that agent name is set so that setConf() does not complain,
+      // the agent name is later overwritten by command-line argument
+      if (conf.get("http.agent.name") == null) {
+        conf.set("http.agent.name", "*");
+      }
+      setConf(conf);
+    }
+
+    /**
+     * @param protocol  (ignored)
+     * @param url
+     *          location of the robots.txt file
+     * */
+    public BaseRobotRules getRobotRulesSet(Protocol protocol, URL url) {
+      BaseRobotRules rules;
+      try {
+        int contentLength = url.openConnection().getContentLength();
+        byte[] robotsBytes = new byte[contentLength];
+        InputStream openStream = url.openStream();
+        openStream.read(robotsBytes);
+        openStream.close();
+        rules = robotParser.parseContent(url.toString(), robotsBytes,
+            "text/plain", this.conf.get("http.agent.name"));
+      } catch (IOException e) {
+        LOG.error("Failed to open robots.txt file " + url
+            + StringUtils.stringifyException(e));
+        rules = EMPTY_RULES;
+      }
+      return rules;
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = NutchConfiguration.create();
+    int res = ToolRunner.run(conf, new TestRobotRulesParser(conf), args);
+    System.exit(res);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java b/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java
new file mode 100644
index 0000000..6685249
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes related to the {@link org.apache.nutch.protocol.Protocol Protocol} interface,
+ * see also {@link org.apache.nutch.net.protocols}.
+ */
+package org.apache.nutch.protocol;
+

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java b/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java
new file mode 100644
index 0000000..d74c7fb
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java
@@ -0,0 +1,68 @@
+package org.apache.nutch.scoring;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.scoring.ScoringFilter;
+import org.apache.nutch.scoring.ScoringFilterException;
+
+public abstract class AbstractScoringFilter implements ScoringFilter {
+
+  private Configuration conf;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public void injectedScore(Text url, CrawlDatum datum)
+      throws ScoringFilterException {
+  }
+
+  public void initialScore(Text url, CrawlDatum datum)
+      throws ScoringFilterException {
+  }
+
+  public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+      throws ScoringFilterException {
+    return initSort;
+  }
+
+  public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+      throws ScoringFilterException {
+  }
+
+  public void passScoreAfterParsing(Text url, Content content, Parse parse)
+      throws ScoringFilterException {
+  }
+
+  public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+      ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+      CrawlDatum adjust, int allCount) throws ScoringFilterException {
+    return adjust;
+  }
+
+  public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+      List<CrawlDatum> inlinked) throws ScoringFilterException {
+  }
+
+  @Override
+  public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
+      CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+      throws ScoringFilterException {
+    return initScore;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java
new file mode 100644
index 0000000..4061a75
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.plugin.Pluggable;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * A contract defining behavior of scoring plugins.
+ * 
+ * A scoring filter will manipulate scoring variables in CrawlDatum and in
+ * resulting search indexes. Filters can be chained in a specific order, to
+ * provide multi-stage scoring adjustments.
+ * 
+ * @author Andrzej Bialecki
+ */
+public interface ScoringFilter extends Configurable, Pluggable {
+  /** The name of the extension point. */
+  public final static String X_POINT_ID = ScoringFilter.class.getName();
+
+  /**
+   * Set an initial score for newly injected pages. Note: newly injected pages
+   * may have no inlinks, so filter implementations may wish to set this score
+   * to a non-zero value, to give newly injected pages some initial credit.
+   * 
+   * @param url
+   *          url of the page
+   * @param datum
+   *          new datum. Filters will modify it in-place.
+   * @throws ScoringFilterException
+   */
+  public void injectedScore(Text url, CrawlDatum datum)
+      throws ScoringFilterException;
+
+  /**
+   * Set an initial score for newly discovered pages. Note: newly discovered
+   * pages have at least one inlink with its score contribution, so filter
+   * implementations may choose to set initial score to zero (unknown value),
+   * and then the inlink score contribution will set the "real" value of the new
+   * page.
+   * 
+   * @param url
+   *          url of the page
+   * @param datum
+   *          new datum. Filters will modify it in-place.
+   * @throws ScoringFilterException
+   */
+  public void initialScore(Text url, CrawlDatum datum)
+      throws ScoringFilterException;
+
+  /**
+   * This method prepares a sort value for the purpose of sorting and selecting
+   * top N scoring pages during fetchlist generation.
+   * 
+   * @param url
+   *          url of the page
+   * @param datum
+   *          page's datum, should not be modified
+   * @param initSort
+   *          initial sort value, or a value from previous filters in chain
+   */
+  public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+      throws ScoringFilterException;
+
+  /**
+   * This method takes all relevant score information from the current datum
+   * (coming from a generated fetchlist) and stores it into
+   * {@link org.apache.nutch.protocol.Content} metadata. This is needed in order
+   * to pass this value(s) to the mechanism that distributes it to outlinked
+   * pages.
+   * 
+   * @param url
+   *          url of the page
+   * @param datum
+   *          source datum. NOTE: modifications to this value are not persisted.
+   * @param content
+   *          instance of content. Implementations may modify this in-place,
+   *          primarily by setting some metadata properties.
+   */
+  public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+      throws ScoringFilterException;
+
+  /**
+   * Currently a part of score distribution is performed using only data coming
+   * from the parsing process. We need this method in order to ensure the
+   * presence of score data in these steps.
+   * 
+   * @param url
+   *          page url
+   * @param content
+   *          original content. NOTE: modifications to this value are not
+   *          persisted.
+   * @param parse
+   *          target instance to copy the score information to. Implementations
+   *          may modify this in-place, primarily by setting some metadata
+   *          properties.
+   */
+  public void passScoreAfterParsing(Text url, Content content, Parse parse)
+      throws ScoringFilterException;
+
+  /**
+   * Distribute score value from the current page to all its outlinked pages.
+   * 
+   * @param fromUrl
+   *          url of the source page
+   * @param parseData
+   *          ParseData instance, which stores relevant score value(s) in its
+   *          metadata. NOTE: filters may modify this in-place, all changes will
+   *          be persisted.
+   * @param targets
+   *          &lt;url, CrawlDatum&gt; pairs. NOTE: filters can modify this
+   *          in-place, all changes will be persisted.
+   * @param adjust
+   *          a CrawlDatum instance, initially null, which implementations may
+   *          use to pass adjustment values to the original CrawlDatum. When
+   *          creating this instance, set its status to
+   *          {@link CrawlDatum#STATUS_LINKED}.
+   * @param allCount
+   *          number of all collected outlinks from the source page
+   * @return if needed, implementations may return an instance of CrawlDatum,
+   *         with status {@link CrawlDatum#STATUS_LINKED}, which contains
+   *         adjustments to be applied to the original CrawlDatum score(s) and
+   *         metadata. This can be null if not needed.
+   * @throws ScoringFilterException
+   */
+  public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+      ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+      CrawlDatum adjust, int allCount) throws ScoringFilterException;
+
+  /**
+   * This method calculates a new score of CrawlDatum during CrawlDb update,
+   * based on the initial value of the original CrawlDatum, and also score
+   * values contributed by inlinked pages.
+   * 
+   * @param url
+   *          url of the page
+   * @param old
+   *          original datum, with original score. May be null if this is a
+   *          newly discovered page. If not null, filters should use score
+   *          values from this parameter as the starting values - the
+   *          <code>datum</code> parameter may contain values that are no longer
+   *          valid, if other updates occured between generation and this
+   *          update.
+   * @param datum
+   *          the new datum, with the original score saved at the time when
+   *          fetchlist was generated. Filters should update this in-place, and
+   *          it will be saved in the crawldb.
+   * @param inlinked
+   *          (partial) list of CrawlDatum-s (with their scores) from links
+   *          pointing to this page, found in the current update batch.
+   * @throws ScoringFilterException
+   */
+  public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+      List<CrawlDatum> inlinked) throws ScoringFilterException;
+
+  /**
+   * This method calculates a Lucene document boost.
+   * 
+   * @param url
+   *          url of the page
+   * @param doc
+   *          Lucene document. NOTE: this already contains all information
+   *          collected by indexing filters. Implementations may modify this
+   *          instance, in order to store/remove some information.
+   * @param dbDatum
+   *          current page from CrawlDb. NOTE: changes made to this instance are
+   *          not persisted.
+   * @param fetchDatum
+   *          datum from FetcherOutput (containing among others the fetching
+   *          status)
+   * @param parse
+   *          parsing result. NOTE: changes made to this instance are not
+   *          persisted.
+   * @param inlinks
+   *          current inlinks from LinkDb. NOTE: changes made to this instance
+   *          are not persisted.
+   * @param initScore
+   *          initial boost value for the Lucene document.
+   * @return boost value for the Lucene document. This value is passed as an
+   *         argument to the next scoring filter in chain. NOTE: implementations
+   *         may also express other scoring strategies by modifying Lucene
+   *         document directly.
+   * @throws ScoringFilterException
+   */
+  public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
+      CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+      throws ScoringFilterException;
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java
new file mode 100644
index 0000000..f363c4b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring;
+
+/**
+ * Specialized exception for errors during scoring.
+ * 
+ * @author Andrzej Bialecki
+ */
+@SuppressWarnings("serial")
+public class ScoringFilterException extends Exception {
+
+  public ScoringFilterException() {
+    super();
+  }
+
+  public ScoringFilterException(String message) {
+    super(message);
+  }
+
+  public ScoringFilterException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ScoringFilterException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java
new file mode 100644
index 0000000..5bad78f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.scoring;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Creates and caches {@link ScoringFilter} implementing plugins.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class ScoringFilters extends Configured implements ScoringFilter {
+
+  private ScoringFilter[] filters;
+
+  public ScoringFilters(Configuration conf) {
+    super(conf);
+    this.filters = (ScoringFilter[]) PluginRepository.get(conf)
+        .getOrderedPlugins(ScoringFilter.class, ScoringFilter.X_POINT_ID,
+            "scoring.filter.order");
+  }
+
+  /** Calculate a sort value for Generate. */
+  public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+      throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      initSort = this.filters[i].generatorSortValue(url, datum, initSort);
+    }
+    return initSort;
+  }
+
+  /** Calculate a new initial score, used when adding newly discovered pages. */
+  public void initialScore(Text url, CrawlDatum datum)
+      throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      this.filters[i].initialScore(url, datum);
+    }
+  }
+
+  /** Calculate a new initial score, used when injecting new pages. */
+  public void injectedScore(Text url, CrawlDatum datum)
+      throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      this.filters[i].injectedScore(url, datum);
+    }
+  }
+
+  /** Calculate updated page score during CrawlDb.update(). */
+  public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+      List<CrawlDatum> inlinked) throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      this.filters[i].updateDbScore(url, old, datum, inlinked);
+    }
+  }
+
+  public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+      throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      this.filters[i].passScoreBeforeParsing(url, datum, content);
+    }
+  }
+
+  public void passScoreAfterParsing(Text url, Content content, Parse parse)
+      throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      this.filters[i].passScoreAfterParsing(url, content, parse);
+    }
+  }
+
+  public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+      ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+      CrawlDatum adjust, int allCount) throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      adjust = this.filters[i].distributeScoreToOutlinks(fromUrl, parseData,
+          targets, adjust, allCount);
+    }
+    return adjust;
+  }
+
+  public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
+      CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+      throws ScoringFilterException {
+    for (int i = 0; i < this.filters.length; i++) {
+      initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum,
+          parse, inlinks, initScore);
+    }
+    return initScore;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java b/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java
new file mode 100644
index 0000000..b6a578b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The {@link org.apache.nutch.scoring.ScoringFilter ScoringFilter} interface.
+ */
+package org.apache.nutch.scoring;
+

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
new file mode 100644
index 0000000..67c9366
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class for holding link information including the url, anchor text, a score,
+ * the timestamp of the link and a link type.
+ */
+public class LinkDatum implements Writable {
+
+  public final static byte INLINK = 1;
+  public final static byte OUTLINK = 2;
+
+  private String url = null;
+  private String anchor = "";
+  private float score = 0.0f;
+  private long timestamp = 0L;
+  private byte linkType = 0;
+
+  /**
+   * Default constructor, no url, timestamp, score, or link type.
+   */
+  public LinkDatum() {
+
+  }
+
+  /**
+   * Creates a LinkDatum with a given url. Timestamp is set to current time.
+   * 
+   * @param url
+   *          The link url.
+   */
+  public LinkDatum(String url) {
+    this(url, "", System.currentTimeMillis());
+  }
+
+  /**
+   * Creates a LinkDatum with a url and an anchor text. Timestamp is set to
+   * current time.
+   * 
+   * @param url
+   *          The link url.
+   * @param anchor
+   *          The link anchor text.
+   */
+  public LinkDatum(String url, String anchor) {
+    this(url, anchor, System.currentTimeMillis());
+  }
+
+  public LinkDatum(String url, String anchor, long timestamp) {
+    this.url = url;
+    this.anchor = anchor;
+    this.timestamp = timestamp;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public String getAnchor() {
+    return anchor;
+  }
+
+  public void setAnchor(String anchor) {
+    this.anchor = anchor;
+  }
+
+  public float getScore() {
+    return score;
+  }
+
+  public void setScore(float score) {
+    this.score = score;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  public byte getLinkType() {
+    return linkType;
+  }
+
+  public void setLinkType(byte linkType) {
+    this.linkType = linkType;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    url = Text.readString(in);
+    anchor = Text.readString(in);
+    score = in.readFloat();
+    timestamp = in.readLong();
+    linkType = in.readByte();
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, url);
+    Text.writeString(out, anchor != null ? anchor : "");
+    out.writeFloat(score);
+    out.writeLong(timestamp);
+    out.writeByte(linkType);
+  }
+
+  public String toString() {
+
+    String type = (linkType == INLINK ? "inlink"
+        : (linkType == OUTLINK) ? "outlink" : "unknown");
+    return "url: " + url + ", anchor: " + anchor + ", score: " + score
+        + ", timestamp: " + timestamp + ", link type: " + type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
new file mode 100644
index 0000000..1569c4d
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * The LinkDumper tool creates a database of node to inlink information that can
+ * be read using the nested Reader class. This allows the inlink and scoring
+ * state of a single url to be reviewed quickly to determine why a given url is
+ * ranking a certain way. This tool is to be used with the LinkRank analysis.
+ */
+public class LinkDumper extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(LinkDumper.class);
+  public static final String DUMP_DIR = "linkdump";
+
+  /**
+   * Reader class which will print out the url and all of its inlinks to system
+   * out. Each inlinkwill be displayed with its node information including score
+   * and number of in and outlinks.
+   */
+  public static class Reader {
+
+    public static void main(String[] args) throws Exception {
+
+      if (args == null || args.length < 2) {
+        System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>");
+        return;
+      }
+
+      // open the readers for the linkdump directory
+      Configuration conf = NutchConfiguration.create();
+      FileSystem fs = FileSystem.get(conf);
+      Path webGraphDb = new Path(args[0]);
+      String url = args[1];
+      MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
+          webGraphDb, DUMP_DIR), conf);
+
+      // get the link nodes for the url
+      Text key = new Text(url);
+      LinkNodes nodes = new LinkNodes();
+      MapFileOutputFormat.getEntry(readers,
+          new HashPartitioner<Text, LinkNodes>(), key, nodes);
+
+      // print out the link nodes
+      LinkNode[] linkNodesAr = nodes.getLinks();
+      System.out.println(url + ":");
+      for (LinkNode node : linkNodesAr) {
+        System.out.println("  " + node.getUrl() + " - "
+            + node.getNode().toString());
+      }
+
+      // close the readers
+      FSUtils.closeReaders(readers);
+    }
+  }
+
+  /**
+   * Bean class which holds url to node information.
+   */
+  public static class LinkNode implements Writable {
+
+    private String url = null;
+    private Node node = null;
+
+    public LinkNode() {
+
+    }
+
+    public LinkNode(String url, Node node) {
+      this.url = url;
+      this.node = node;
+    }
+
+    public String getUrl() {
+      return url;
+    }
+
+    public void setUrl(String url) {
+      this.url = url;
+    }
+
+    public Node getNode() {
+      return node;
+    }
+
+    public void setNode(Node node) {
+      this.node = node;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      url = in.readUTF();
+      node = new Node();
+      node.readFields(in);
+    }
+
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(url);
+      node.write(out);
+    }
+
+  }
+
+  /**
+   * Writable class which holds an array of LinkNode objects.
+   */
+  public static class LinkNodes implements Writable {
+
+    private LinkNode[] links;
+
+    public LinkNodes() {
+
+    }
+
+    public LinkNodes(LinkNode[] links) {
+      this.links = links;
+    }
+
+    public LinkNode[] getLinks() {
+      return links;
+    }
+
+    public void setLinks(LinkNode[] links) {
+      this.links = links;
+    }
+
+    public void readFields(DataInput in) throws IOException {
+      int numLinks = in.readInt();
+      if (numLinks > 0) {
+        links = new LinkNode[numLinks];
+        for (int i = 0; i < numLinks; i++) {
+          LinkNode node = new LinkNode();
+          node.readFields(in);
+          links[i] = node;
+        }
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      if (links != null && links.length > 0) {
+        int numLinks = links.length;
+        out.writeInt(numLinks);
+        for (int i = 0; i < numLinks; i++) {
+          links[i].write(out);
+        }
+      }
+    }
+  }
+
+  /**
+   * Inverts outlinks from the WebGraph to inlinks and attaches node
+   * information.
+   */
+  public static class Inverter implements
+      Mapper<Text, Writable, Text, ObjectWritable>,
+      Reducer<Text, ObjectWritable, Text, LinkNode> {
+
+    private JobConf conf;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Wraps all values in ObjectWritables.
+     */
+    public void map(Text key, Writable value,
+        OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+        throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Inverts outlinks to inlinks while attaching node information to the
+     * outlink.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+        OutputCollector<Text, LinkNode> output, Reporter reporter)
+        throws IOException {
+
+      String fromUrl = key.toString();
+      List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
+      Node node = null;
+      
+      // loop through all values aggregating outlinks, saving node
+      while (values.hasNext()) {
+        ObjectWritable write = values.next();
+        Object obj = write.get();
+        if (obj instanceof Node) {
+          node = (Node) obj;
+        } else if (obj instanceof LinkDatum) {
+          outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
+        }
+      }
+
+      // only collect if there are outlinks
+      int numOutlinks = node.getNumOutlinks();
+      if (numOutlinks > 0) {
+        for (int i = 0; i < outlinks.size(); i++) {
+          LinkDatum outlink = outlinks.get(i);
+          String toUrl = outlink.getUrl();
+
+          // collect the outlink as an inlink with the node
+          output.collect(new Text(toUrl), new LinkNode(fromUrl, node));
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Merges LinkNode objects into a single array value per url. This allows all
+   * values to be quickly retrieved and printed via the Reader tool.
+   */
+  public static class Merger implements
+      Reducer<Text, LinkNode, Text, LinkNodes> {
+
+    private JobConf conf;
+    private int maxInlinks = 50000;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Aggregate all LinkNode objects for a given url.
+     */
+    public void reduce(Text key, Iterator<LinkNode> values,
+        OutputCollector<Text, LinkNodes> output, Reporter reporter)
+        throws IOException {
+
+      List<LinkNode> nodeList = new ArrayList<LinkNode>();
+      int numNodes = 0;
+
+      while (values.hasNext()) {
+        LinkNode cur = values.next();
+        if (numNodes < maxInlinks) {
+          nodeList.add(WritableUtils.clone(cur, conf));
+          numNodes++;
+        } else {
+          break;
+        }
+      }
+
+      LinkNode[] linkNodesAr = nodeList.toArray(new LinkNode[nodeList.size()]);
+      LinkNodes linkNodes = new LinkNodes(linkNodesAr);
+      output.collect(key, linkNodes);
+    }
+
+    public void close() {
+
+    }
+  }
+
+  /**
+   * Runs the inverter and merger jobs of the LinkDumper tool to create the url
+   * to inlink node database.
+   */
+  public void dumpLinks(Path webGraphDb) throws IOException {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("NodeDumper: starting at " + sdf.format(start));
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    Path linkdump = new Path(webGraphDb, DUMP_DIR);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+
+    // run the inverter job
+    Path tempInverted = new Path(webGraphDb, "inverted-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    JobConf inverter = new NutchJob(conf);
+    inverter.setJobName("LinkDumper: inverter");
+    FileInputFormat.addInputPath(inverter, nodeDb);
+    FileInputFormat.addInputPath(inverter, outlinkDb);
+    inverter.setInputFormat(SequenceFileInputFormat.class);
+    inverter.setMapperClass(Inverter.class);
+    inverter.setReducerClass(Inverter.class);
+    inverter.setMapOutputKeyClass(Text.class);
+    inverter.setMapOutputValueClass(ObjectWritable.class);
+    inverter.setOutputKeyClass(Text.class);
+    inverter.setOutputValueClass(LinkNode.class);
+    FileOutputFormat.setOutputPath(inverter, tempInverted);
+    inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+    try {
+      LOG.info("LinkDumper: running inverter");
+      JobClient.runJob(inverter);
+      LOG.info("LinkDumper: finished inverter");
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    // run the merger job
+    JobConf merger = new NutchJob(conf);
+    merger.setJobName("LinkDumper: merger");
+    FileInputFormat.addInputPath(merger, tempInverted);
+    merger.setInputFormat(SequenceFileInputFormat.class);
+    merger.setReducerClass(Merger.class);
+    merger.setMapOutputKeyClass(Text.class);
+    merger.setMapOutputValueClass(LinkNode.class);
+    merger.setOutputKeyClass(Text.class);
+    merger.setOutputValueClass(LinkNodes.class);
+    FileOutputFormat.setOutputPath(merger, linkdump);
+    merger.setOutputFormat(MapFileOutputFormat.class);
+
+    try {
+      LOG.info("LinkDumper: running merger");
+      JobClient.runJob(merger);
+      LOG.info("LinkDumper: finished merger");
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+
+    fs.delete(tempInverted, true);
+    long end = System.currentTimeMillis();
+    LOG.info("LinkDumper: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkDumper(),
+        args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the LinkDumper tool. This simply creates the database, to read the
+   * values the nested Reader tool must be used.
+   */
+  public int run(String[] args) throws Exception {
+
+    Options options = new Options();
+    OptionBuilder.withArgName("help");
+    OptionBuilder.withDescription("show this help message");
+    Option helpOpts = OptionBuilder.create("help");
+    options.addOption(helpOpts);
+
+    OptionBuilder.withArgName("webgraphdb");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the web graph database to use");
+    Option webGraphDbOpts = OptionBuilder.create("webgraphdb");
+    options.addOption(webGraphDbOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("LinkDumper", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+      dumpLinks(new Path(webGraphDb));
+      return 0;
+    } catch (Exception e) {
+      LOG.error("LinkDumper: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java
new file mode 100644
index 0000000..bd22828
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+public class LinkRank extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(LinkRank.class);
+  private static final String NUM_NODES = "_num_nodes_";
+
+  /**
+   * Runs the counter job. The counter job determines the number of links in the
+   * webgraph. This is used during analysis.
+   * 
+   * @param fs
+   *          The job file system.
+   * @param webGraphDb
+   *          The web graph database to use.
+   * 
+   * @return The number of nodes in the web graph.
+   * @throws IOException
+   *           If an error occurs while running the counter job.
+   */
+  private int runCounter(FileSystem fs, Path webGraphDb) throws IOException {
+
+    // configure the counter job
+    Path numLinksPath = new Path(webGraphDb, NUM_NODES);
+    Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    JobConf counter = new NutchJob(getConf());
+    counter.setJobName("LinkRank Counter");
+    FileInputFormat.addInputPath(counter, nodeDb);
+    FileOutputFormat.setOutputPath(counter, numLinksPath);
+    counter.setInputFormat(SequenceFileInputFormat.class);
+    counter.setMapperClass(Counter.class);
+    counter.setCombinerClass(Counter.class);
+    counter.setReducerClass(Counter.class);
+    counter.setMapOutputKeyClass(Text.class);
+    counter.setMapOutputValueClass(LongWritable.class);
+    counter.setOutputKeyClass(Text.class);
+    counter.setOutputValueClass(LongWritable.class);
+    counter.setNumReduceTasks(1);
+    counter.setOutputFormat(TextOutputFormat.class);
+    counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    // run the counter job, outputs to a single reduce task and file
+    LOG.info("Starting link counter job");
+    try {
+      JobClient.runJob(counter);
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished link counter job");
+
+    // read the first (and only) line from the file which should be the
+    // number of links in the web graph
+    LOG.info("Reading numlinks temp file");
+    FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
+    BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+    String numLinksLine = buffer.readLine();
+    readLinks.close();
+
+    // check if there are links to process, if none, webgraph might be empty
+    if (numLinksLine == null || numLinksLine.length() == 0) {
+      fs.delete(numLinksPath, true);
+      throw new IOException("No links to process, is the webgraph empty?");
+    }
+
+    // delete temp file and convert and return the number of links as an int
+    LOG.info("Deleting numlinks temp file");
+    fs.delete(numLinksPath, true);
+    String numLinks = numLinksLine.split("\\s+")[1];
+    return Integer.parseInt(numLinks);
+  }
+
+  /**
+   * Runs the initializer job. The initializer job sets up the nodes with a
+   * default starting score for link analysis.
+   * 
+   * @param nodeDb
+   *          The node database to use.
+   * @param output
+   *          The job output directory.
+   * 
+   * @throws IOException
+   *           If an error occurs while running the initializer job.
+   */
+  private void runInitializer(Path nodeDb, Path output) throws IOException {
+
+    // configure the initializer
+    JobConf initializer = new NutchJob(getConf());
+    initializer.setJobName("LinkAnalysis Initializer");
+    FileInputFormat.addInputPath(initializer, nodeDb);
+    FileOutputFormat.setOutputPath(initializer, output);
+    initializer.setInputFormat(SequenceFileInputFormat.class);
+    initializer.setMapperClass(Initializer.class);
+    initializer.setMapOutputKeyClass(Text.class);
+    initializer.setMapOutputValueClass(Node.class);
+    initializer.setOutputKeyClass(Text.class);
+    initializer.setOutputValueClass(Node.class);
+    initializer.setOutputFormat(MapFileOutputFormat.class);
+    initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    // run the initializer
+    LOG.info("Starting initialization job");
+    try {
+      JobClient.runJob(initializer);
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished initialization job.");
+  }
+
+  /**
+   * Runs the inverter job. The inverter job flips outlinks to inlinks to be
+   * passed into the analysis job.
+   * 
+   * @param nodeDb
+   *          The node database to use.
+   * @param outlinkDb
+   *          The outlink database to use.
+   * @param output
+   *          The output directory.
+   * 
+   * @throws IOException
+   *           If an error occurs while running the inverter job.
+   */
+  private void runInverter(Path nodeDb, Path outlinkDb, Path output)
+      throws IOException {
+
+    // configure the inverter
+    JobConf inverter = new NutchJob(getConf());
+    inverter.setJobName("LinkAnalysis Inverter");
+    FileInputFormat.addInputPath(inverter, nodeDb);
+    FileInputFormat.addInputPath(inverter, outlinkDb);
+    FileOutputFormat.setOutputPath(inverter, output);
+    inverter.setInputFormat(SequenceFileInputFormat.class);
+    inverter.setMapperClass(Inverter.class);
+    inverter.setReducerClass(Inverter.class);
+    inverter.setMapOutputKeyClass(Text.class);
+    inverter.setMapOutputValueClass(ObjectWritable.class);
+    inverter.setOutputKeyClass(Text.class);
+    inverter.setOutputValueClass(LinkDatum.class);
+    inverter.setOutputFormat(SequenceFileOutputFormat.class);
+    inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    // run the inverter job
+    LOG.info("Starting inverter job");
+    try {
+      JobClient.runJob(inverter);
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished inverter job.");
+  }
+
+  /**
+   * Runs the link analysis job. The link analysis job applies the link rank
+   * formula to create a score per url and stores that score in the NodeDb.
+   * 
+   * Typically the link analysis job is run a number of times to allow the link
+   * rank scores to converge.
+   * 
+   * @param nodeDb
+   *          The node database from which we are getting previous link rank
+   *          scores.
+   * @param inverted
+   *          The inverted inlinks
+   * @param output
+   *          The link analysis output.
+   * @param iteration
+   *          The current iteration number.
+   * @param numIterations
+   *          The total number of link analysis iterations
+   * 
+   * @throws IOException
+   *           If an error occurs during link analysis.
+   */
+  private void runAnalysis(Path nodeDb, Path inverted, Path output,
+      int iteration, int numIterations, float rankOne) throws IOException {
+
+    JobConf analyzer = new NutchJob(getConf());
+    analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
+    analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+        + " of " + numIterations);
+    FileInputFormat.addInputPath(analyzer, nodeDb);
+    FileInputFormat.addInputPath(analyzer, inverted);
+    FileOutputFormat.setOutputPath(analyzer, output);
+    analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
+    analyzer.setMapOutputKeyClass(Text.class);
+    analyzer.setMapOutputValueClass(ObjectWritable.class);
+    analyzer.setInputFormat(SequenceFileInputFormat.class);
+    analyzer.setMapperClass(Analyzer.class);
+    analyzer.setReducerClass(Analyzer.class);
+    analyzer.setOutputKeyClass(Text.class);
+    analyzer.setOutputValueClass(Node.class);
+    analyzer.setOutputFormat(MapFileOutputFormat.class);
+    analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
+
+    LOG.info("Starting analysis job");
+    try {
+      JobClient.runJob(analyzer);
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+      throw e;
+    }
+    LOG.info("Finished analysis job.");
+  }
+
+  /**
+   * The Counter job that determines the total number of nodes in the WebGraph.
+   * This is used to determine a rank one score for pages with zero inlinks but
+   * that contain outlinks.
+   */
+  private static class Counter implements
+      Mapper<Text, Node, Text, LongWritable>,
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    private static Text numNodes = new Text(NUM_NODES);
+    private static LongWritable one = new LongWritable(1L);
+
+    public void configure(JobConf conf) {
+    }
+
+    /**
+     * Outputs one for every node.
+     */
+    public void map(Text key, Node value,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+      output.collect(numNodes, one);
+    }
+
+    /**
+     * Totals the node number and outputs a single total value.
+     */
+    public void reduce(Text key, Iterator<LongWritable> values,
+        OutputCollector<Text, LongWritable> output, Reporter reporter)
+        throws IOException {
+
+      long total = 0;
+      while (values.hasNext()) {
+        total += values.next().get();
+      }
+      output.collect(numNodes, new LongWritable(total));
+    }
+
+    public void close() {
+    }
+  }
+
+  private static class Initializer implements Mapper<Text, Node, Text, Node> {
+
+    private JobConf conf;
+    private float initialScore = 1.0f;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      initialScore = conf.getFloat("link.analyze.initial.score", 1.0f);
+    }
+
+    public void map(Text key, Node node, OutputCollector<Text, Node> output,
+        Reporter reporter) throws IOException {
+
+      String url = key.toString();
+      Node outNode = WritableUtils.clone(node, conf);
+      outNode.setInlinkScore(initialScore);
+
+      output.collect(new Text(url), outNode);
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Inverts outlinks and attaches current score from the NodeDb of the
+   * WebGraph. The link analysis process consists of inverting, analyzing and
+   * scoring, in a loop for a given number of iterations.
+   */
+  private static class Inverter implements
+      Mapper<Text, Writable, Text, ObjectWritable>,
+      Reducer<Text, ObjectWritable, Text, LinkDatum> {
+
+    private JobConf conf;
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+    }
+
+    /**
+     * Convert values to ObjectWritable
+     */
+    public void map(Text key, Writable value,
+        OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+        throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(value);
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Inverts outlinks to inlinks, attaches current score for the outlink from
+     * the NodeDb of the WebGraph.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+        OutputCollector<Text, LinkDatum> output, Reporter reporter)
+        throws IOException {
+
+      String fromUrl = key.toString();
+      List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
+      Node node = null;
+
+      // aggregate outlinks, assign other values
+      while (values.hasNext()) {
+        ObjectWritable write = values.next();
+        Object obj = write.get();
+        if (obj instanceof Node) {
+          node = (Node) obj;
+        } else if (obj instanceof LinkDatum) {
+          outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
+        }
+      }
+
+      // get the number of outlinks and the current inlink and outlink scores
+      // from the node of the url
+      int numOutlinks = node.getNumOutlinks();
+      float inlinkScore = node.getInlinkScore();
+      float outlinkScore = node.getOutlinkScore();
+      LOG.debug(fromUrl + ": num outlinks " + numOutlinks);
+
+      // can't invert if no outlinks
+      if (numOutlinks > 0) {
+        for (int i = 0; i < outlinks.size(); i++) {
+          LinkDatum outlink = outlinks.get(i);
+          String toUrl = outlink.getUrl();
+
+          outlink.setUrl(fromUrl);
+          outlink.setScore(outlinkScore);
+
+          // collect the inverted outlink
+          output.collect(new Text(toUrl), outlink);
+          LOG.debug(toUrl + ": inverting inlink from " + fromUrl
+              + " origscore: " + inlinkScore + " numOutlinks: " + numOutlinks
+              + " inlinkscore: " + outlinkScore);
+        }
+      }
+    }
+
+    public void close() {
+    }
+  }
+
+  /**
+   * Runs a single link analysis iteration.
+   */
+  private static class Analyzer implements
+      Mapper<Text, Writable, Text, ObjectWritable>,
+      Reducer<Text, ObjectWritable, Text, Node> {
+
+    private JobConf conf;
+    private float dampingFactor = 0.85f;
+    private float rankOne = 0.0f;
+    private int itNum = 0;
+    private boolean limitPages = true;
+    private boolean limitDomains = true;
+
+    /**
+     * Configures the job, sets the damping factor, rank one score, and other
+     * needed values for analysis.
+     */
+    public void configure(JobConf conf) {
+
+      try {
+        this.conf = conf;
+        this.dampingFactor = conf
+            .getFloat("link.analyze.damping.factor", 0.85f);
+        this.rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
+        this.itNum = conf.getInt("link.analyze.iteration", 0);
+        limitPages = conf.getBoolean("link.ignore.limit.page", true);
+        limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+      } catch (Exception e) {
+        LOG.error(StringUtils.stringifyException(e));
+        throw new IllegalArgumentException(e);
+      }
+    }
+
+    /**
+     * Convert values to ObjectWritable
+     */
+    public void map(Text key, Writable value,
+        OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+        throws IOException {
+
+      ObjectWritable objWrite = new ObjectWritable();
+      objWrite.set(WritableUtils.clone(value, conf));
+      output.collect(key, objWrite);
+    }
+
+    /**
+     * Performs a single iteration of link analysis. The resulting scores are
+     * stored in a temporary NodeDb which replaces the NodeDb of the WebGraph.
+     */
+    public void reduce(Text key, Iterator<ObjectWritable> values,
+        OutputCollector<Text, Node> output, Reporter reporter)
+        throws IOException {
+
+      String url = key.toString();
+      Set<String> domains = new HashSet<String>();
+      Set<String> pages = new HashSet<String>();
+      Node node = null;
+
+      // a page with zero inlinks has a score of rankOne
+      int numInlinks = 0;
+      float totalInlinkScore = rankOne;
+
+      while (values.hasNext()) {
+
+        ObjectWritable next = values.next();
+        Object value = next.get();
+        if (value instanceof Node) {
+          node = (Node) value;
+        } else if (value instanceof LinkDatum) {
+
+          LinkDatum linkDatum = (LinkDatum) value;
+          float scoreFromInlink = linkDatum.getScore();
+          String inlinkUrl = linkDatum.getUrl();
+          String inLinkDomain = URLUtil.getDomainName(inlinkUrl);
+          String inLinkPage = URLUtil.getPage(inlinkUrl);
+
+          // limit counting duplicate inlinks by pages or domains
+          if ((limitPages && pages.contains(inLinkPage))
+              || (limitDomains && domains.contains(inLinkDomain))) {
+            LOG.debug(url + ": ignoring " + scoreFromInlink + " from "
+                + inlinkUrl + ", duplicate page or domain");
+            continue;
+          }
+
+          // aggregate total inlink score
+          numInlinks++;
+          totalInlinkScore += scoreFromInlink;
+          domains.add(inLinkDomain);
+          pages.add(inLinkPage);
+          LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl
+              + ", total: " + totalInlinkScore);
+        }
+      }
+
+      // calculate linkRank score formula
+      float linkRankScore = (1 - this.dampingFactor)
+          + (this.dampingFactor * totalInlinkScore);
+
+      LOG.debug(url + ": score: " + linkRankScore + " num inlinks: "
+          + numInlinks + " iteration: " + itNum);
+
+      // store the score in a temporary NodeDb
+      Node outNode = WritableUtils.clone(node, conf);
+      outNode.setInlinkScore(linkRankScore);
+      output.collect(key, outNode);
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+  /**
+   * Default constructor.
+   */
+  public LinkRank() {
+    super();
+  }
+
+  /**
+   * Configurable constructor.
+   */
+  public LinkRank(Configuration conf) {
+    super(conf);
+  }
+
+  public void close() {
+  }
+
+  /**
+   * Runs the complete link analysis job. The complete job determins rank one
+   * score. Then runs through a given number of invert and analyze iterations,
+   * by default 10. And finally replaces the NodeDb in the WebGraph with the
+   * link rank output.
+   * 
+   * @param webGraphDb
+   *          The WebGraph to run link analysis on.
+   * 
+   * @throws IOException
+   *           If an error occurs during link analysis.
+   */
+  public void analyze(Path webGraphDb) throws IOException {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("Analysis: starting at " + sdf.format(start));
+
+    // store the link rank under the webgraphdb temporarily, final scores get
+    // upddated into the nodedb
+    Path linkRank = new Path(webGraphDb, "linkrank");
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    // create the linkrank directory if needed
+    if (!fs.exists(linkRank)) {
+      fs.mkdirs(linkRank);
+    }
+
+    // the webgraph outlink and node database paths
+    Path wgOutlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+    Path wgNodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+    Path nodeDb = new Path(linkRank, WebGraph.NODE_DIR);
+
+    // get the number of total nodes in the webgraph, used for rank one, then
+    // initialze all urls with a default score
+    int numLinks = runCounter(fs, webGraphDb);
+    runInitializer(wgNodeDb, nodeDb);
+    float rankOneScore = (1f / (float) numLinks);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Analysis: Number of links: " + numLinks);
+      LOG.info("Analysis: Rank One: " + rankOneScore);
+    }
+
+    // run invert and analysis for a given number of iterations to allow the
+    // link rank scores to converge
+    int numIterations = conf.getInt("link.analyze.num.iterations", 10);
+    for (int i = 0; i < numIterations; i++) {
+
+      // the input to inverting is always the previous output from analysis
+      LOG.info("Analysis: Starting iteration " + (i + 1) + " of "
+          + numIterations);
+      Path tempRank = new Path(linkRank + "-"
+          + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+      fs.mkdirs(tempRank);
+      Path tempInverted = new Path(tempRank, "inverted");
+      Path tempNodeDb = new Path(tempRank, WebGraph.NODE_DIR);
+
+      // run invert and analysis
+      runInverter(nodeDb, wgOutlinkDb, tempInverted);
+      runAnalysis(nodeDb, tempInverted, tempNodeDb, i, numIterations,
+          rankOneScore);
+
+      // replace the temporary NodeDb with the output from analysis
+      LOG.info("Analysis: Installing new link scores");
+      FSUtils.replace(fs, linkRank, tempRank, true);
+      LOG.info("Analysis: finished iteration " + (i + 1) + " of "
+          + numIterations);
+    }
+
+    // replace the NodeDb in the WebGraph with the final output of analysis
+    LOG.info("Analysis: Installing web graph nodes");
+    FSUtils.replace(fs, wgNodeDb, nodeDb, true);
+
+    // remove the temporary link rank folder
+    fs.delete(linkRank, true);
+    long end = System.currentTimeMillis();
+    LOG.info("Analysis: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new LinkRank(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Runs the LinkRank tool.
+   */
+  public int run(String[] args) throws Exception {
+
+    Options options = new Options();
+    OptionBuilder.withArgName("help");
+    OptionBuilder.withDescription("show this help message");
+    Option helpOpts = OptionBuilder.create("help");
+    options.addOption(helpOpts);
+
+    OptionBuilder.withArgName("webgraphdb");
+    OptionBuilder.hasArg();
+    OptionBuilder.withDescription("the web graph db to use");
+    Option webgraphOpts = OptionBuilder.create("webgraphdb");
+    options.addOption(webgraphOpts);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("LinkRank", options);
+        return -1;
+      }
+
+      String webGraphDb = line.getOptionValue("webgraphdb");
+
+      analyze(new Path(webGraphDb));
+      return 0;
+    } catch (Exception e) {
+      LOG.error("LinkAnalysis: " + StringUtils.stringifyException(e));
+      return -2;
+    }
+  }
+}