You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:56 UTC
[40/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java b/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java
new file mode 100644
index 0000000..9e75531
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/protocol/ProtocolStatus.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.VersionMismatchException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * @author Andrzej Bialecki
+ */
+public class ProtocolStatus implements Writable {
+
+ private final static byte VERSION = 2;
+
+ /** Content was retrieved without errors. */
+ public static final int SUCCESS = 1;
+ /** Content was not retrieved. Any further errors may be indicated in args. */
+ public static final int FAILED = 2;
+
+ /** This protocol was not found. Application may attempt to retry later. */
+ public static final int PROTO_NOT_FOUND = 10;
+ /** Resource is gone. */
+ public static final int GONE = 11;
+ /** Resource has moved permanently. New url should be found in args. */
+ public static final int MOVED = 12;
+ /** Resource has moved temporarily. New url should be found in args. */
+ public static final int TEMP_MOVED = 13;
+ /** Resource was not found. */
+ public static final int NOTFOUND = 14;
+ /** Temporary failure. Application may retry immediately. */
+ public static final int RETRY = 15;
+ /**
+ * Unspecified exception occured. Further information may be provided in args.
+ */
+ public static final int EXCEPTION = 16;
+ /** Access denied - authorization required, but missing/incorrect. */
+ public static final int ACCESS_DENIED = 17;
+ /** Access denied by robots.txt rules. */
+ public static final int ROBOTS_DENIED = 18;
+ /** Too many redirects. */
+ public static final int REDIR_EXCEEDED = 19;
+ /** Not fetching. */
+ public static final int NOTFETCHING = 20;
+ /** Unchanged since the last fetch. */
+ public static final int NOTMODIFIED = 21;
+ /**
+ * Request was refused by protocol plugins, because it would block. The
+ * expected number of milliseconds to wait before retry may be provided in
+ * args.
+ */
+ public static final int WOULDBLOCK = 22;
+ /** Thread was blocked http.max.delays times during fetching. */
+ public static final int BLOCKED = 23;
+
+ // Useful static instances for status codes that don't usually require any
+ // additional arguments.
+ public static final ProtocolStatus STATUS_SUCCESS = new ProtocolStatus(
+ SUCCESS);
+ public static final ProtocolStatus STATUS_FAILED = new ProtocolStatus(FAILED);
+ public static final ProtocolStatus STATUS_GONE = new ProtocolStatus(GONE);
+ public static final ProtocolStatus STATUS_NOTFOUND = new ProtocolStatus(
+ NOTFOUND);
+ public static final ProtocolStatus STATUS_RETRY = new ProtocolStatus(RETRY);
+ public static final ProtocolStatus STATUS_ROBOTS_DENIED = new ProtocolStatus(
+ ROBOTS_DENIED);
+ public static final ProtocolStatus STATUS_REDIR_EXCEEDED = new ProtocolStatus(
+ REDIR_EXCEEDED);
+ public static final ProtocolStatus STATUS_NOTFETCHING = new ProtocolStatus(
+ NOTFETCHING);
+ public static final ProtocolStatus STATUS_NOTMODIFIED = new ProtocolStatus(
+ NOTMODIFIED);
+ public static final ProtocolStatus STATUS_WOULDBLOCK = new ProtocolStatus(
+ WOULDBLOCK);
+ public static final ProtocolStatus STATUS_BLOCKED = new ProtocolStatus(
+ BLOCKED);
+
+ private int code;
+ private long lastModified;
+ private String[] args;
+
+ private static final HashMap<Integer, String> codeToName = new HashMap<Integer, String>();
+ static {
+ codeToName.put(new Integer(SUCCESS), "success");
+ codeToName.put(new Integer(FAILED), "failed");
+ codeToName.put(new Integer(PROTO_NOT_FOUND), "proto_not_found");
+ codeToName.put(new Integer(GONE), "gone");
+ codeToName.put(new Integer(MOVED), "moved");
+ codeToName.put(new Integer(TEMP_MOVED), "temp_moved");
+ codeToName.put(new Integer(NOTFOUND), "notfound");
+ codeToName.put(new Integer(RETRY), "retry");
+ codeToName.put(new Integer(EXCEPTION), "exception");
+ codeToName.put(new Integer(ACCESS_DENIED), "access_denied");
+ codeToName.put(new Integer(ROBOTS_DENIED), "robots_denied");
+ codeToName.put(new Integer(REDIR_EXCEEDED), "redir_exceeded");
+ codeToName.put(new Integer(NOTFETCHING), "notfetching");
+ codeToName.put(new Integer(NOTMODIFIED), "notmodified");
+ codeToName.put(new Integer(WOULDBLOCK), "wouldblock");
+ codeToName.put(new Integer(BLOCKED), "blocked");
+ }
+
+ public ProtocolStatus() {
+
+ }
+
+ public ProtocolStatus(int code, String[] args) {
+ this.code = code;
+ this.args = args;
+ }
+
+ public ProtocolStatus(int code, String[] args, long lastModified) {
+ this.code = code;
+ this.args = args;
+ this.lastModified = lastModified;
+ }
+
+ public ProtocolStatus(int code) {
+ this(code, null);
+ }
+
+ public ProtocolStatus(int code, long lastModified) {
+ this(code, null, lastModified);
+ }
+
+ public ProtocolStatus(int code, Object message) {
+ this(code, message, 0L);
+ }
+
+ public ProtocolStatus(int code, Object message, long lastModified) {
+ this.code = code;
+ this.lastModified = lastModified;
+ if (message != null)
+ this.args = new String[] { String.valueOf(message) };
+ }
+
+ public ProtocolStatus(Throwable t) {
+ this(EXCEPTION, t);
+ }
+
+ public static ProtocolStatus read(DataInput in) throws IOException {
+ ProtocolStatus res = new ProtocolStatus();
+ res.readFields(in);
+ return res;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ byte version = in.readByte();
+ switch (version) {
+ case 1:
+ code = in.readByte();
+ lastModified = in.readLong();
+ args = WritableUtils.readCompressedStringArray(in);
+ break;
+ case VERSION:
+ code = in.readByte();
+ lastModified = in.readLong();
+ args = WritableUtils.readStringArray(in);
+ break;
+ default:
+ throw new VersionMismatchException(VERSION, version);
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeByte(VERSION);
+ out.writeByte((byte) code);
+ out.writeLong(lastModified);
+ if (args == null) {
+ out.writeInt(-1);
+ } else {
+ WritableUtils.writeStringArray(out, args);
+ }
+ }
+
+ public void setArgs(String[] args) {
+ this.args = args;
+ }
+
+ public String[] getArgs() {
+ return args;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getName() {
+ return codeToName.get(this.code);
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public boolean isSuccess() {
+ return code == SUCCESS;
+ }
+
+ public boolean isTransientFailure() {
+ return code == ACCESS_DENIED || code == EXCEPTION || code == REDIR_EXCEEDED
+ || code == RETRY || code == TEMP_MOVED || code == WOULDBLOCK
+ || code == PROTO_NOT_FOUND;
+ }
+
+ public boolean isPermanentFailure() {
+ return code == FAILED || code == GONE || code == MOVED || code == NOTFOUND
+ || code == ROBOTS_DENIED;
+ }
+
+ public boolean isRedirect() {
+ return code == MOVED || code == TEMP_MOVED;
+ }
+
+ public String getMessage() {
+ if (args != null && args.length > 0)
+ return args[0];
+ return null;
+ }
+
+ public void setMessage(String msg) {
+ if (args != null && args.length > 0)
+ args[0] = msg;
+ else
+ args = new String[] { msg };
+ }
+
+ public long getLastModified() {
+ return lastModified;
+ }
+
+ public void setLastModified(long lastModified) {
+ this.lastModified = lastModified;
+ }
+
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ if (!(o instanceof ProtocolStatus))
+ return false;
+ ProtocolStatus other = (ProtocolStatus) o;
+ if (this.code != other.code || this.lastModified != other.lastModified)
+ return false;
+ if (this.args == null) {
+ if (other.args == null)
+ return true;
+ else
+ return false;
+ } else {
+ if (other.args == null)
+ return false;
+ if (other.args.length != this.args.length)
+ return false;
+ for (int i = 0; i < this.args.length; i++) {
+ if (!this.args[i].equals(other.args[i]))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public String toString() {
+ StringBuffer res = new StringBuffer();
+ res.append(codeToName.get(new Integer(code)) + "(" + code
+ + "), lastModified=" + lastModified);
+ if (args != null) {
+ if (args.length == 1) {
+ res.append(": " + String.valueOf(args[0]));
+ } else {
+ for (int i = 0; i < args.length; i++) {
+ if (args[i] != null)
+ res.append(", args[" + i + "]=" + String.valueOf(args[i]));
+ }
+ }
+ }
+ return res.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java b/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java
new file mode 100644
index 0000000..475aef4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/protocol/RobotRulesParser.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.protocol;
+
+// JDK imports
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.LineNumberReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Nutch imports
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.SuffixStringMatcher;
+
+import crawlercommons.robots.BaseRobotRules;
+import crawlercommons.robots.SimpleRobotRules;
+import crawlercommons.robots.SimpleRobotRules.RobotRulesMode;
+import crawlercommons.robots.SimpleRobotRulesParser;
+
+/**
+ * This class uses crawler-commons for handling the parsing of
+ * {@code robots.txt} files. It emits SimpleRobotRules objects, which describe
+ * the download permissions as described in SimpleRobotRulesParser.
+ *
+ * Protocol-specific implementations have to implement the method
+ * {@link getRobotRulesSet}.
+ */
+public abstract class RobotRulesParser implements Tool {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(RobotRulesParser.class);
+
+ protected static final Hashtable<String, BaseRobotRules> CACHE = new Hashtable<String, BaseRobotRules>();
+
+ /**
+ * A {@link BaseRobotRules} object appropriate for use when the
+ * {@code robots.txt} file is empty or missing; all requests are allowed.
+ */
+ public static final BaseRobotRules EMPTY_RULES = new SimpleRobotRules(
+ RobotRulesMode.ALLOW_ALL);
+
+ /**
+ * A {@link BaseRobotRules} object appropriate for use when the
+ * {@code robots.txt} file is not fetched due to a {@code 403/Forbidden}
+ * response; all requests are disallowed.
+ */
+ public static BaseRobotRules FORBID_ALL_RULES = new SimpleRobotRules(
+ RobotRulesMode.ALLOW_NONE);
+
+ private static SimpleRobotRulesParser robotParser = new SimpleRobotRulesParser();
+ protected Configuration conf;
+ protected String agentNames;
+
+ /** set of host names or IPs to be explicitly excluded from robots.txt checking */
+ protected Set<String> whiteList = new HashSet<String>();
+
+ /* Matcher user for efficiently matching URLs against a set of suffixes. */
+ private SuffixStringMatcher matcher = null;
+
+ public RobotRulesParser() {
+ }
+
+ public RobotRulesParser(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Set the {@link Configuration} object
+ */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+
+ // Grab the agent names we advertise to robots files.
+ String agentName = conf.get("http.agent.name");
+ if (agentName == null || (agentName = agentName.trim()).isEmpty()) {
+ throw new RuntimeException("Agent name not configured!");
+ }
+ agentNames = agentName;
+
+ // If there are any other agents specified, append those to the list of
+ // agents
+ String otherAgents = conf.get("http.robots.agents");
+ if (otherAgents != null && !otherAgents.trim().isEmpty()) {
+ StringTokenizer tok = new StringTokenizer(otherAgents, ",");
+ StringBuilder sb = new StringBuilder(agentNames);
+ while (tok.hasMoreTokens()) {
+ String str = tok.nextToken().trim();
+ if (str.equals("*") || str.equals(agentName)) {
+ // skip wildcard "*" or agent name itself
+ // (required for backward compatibility, cf. NUTCH-1715 and
+ // NUTCH-1718)
+ } else {
+ sb.append(",").append(str);
+ }
+ }
+
+ agentNames = sb.toString();
+ }
+
+ String[] confWhiteList = conf.getStrings("http.robot.rules.whitelist");
+ if (confWhiteList == null) {
+ LOG.info("robots.txt whitelist not configured.");
+ }
+ else {
+ for (int i = 0; i < confWhiteList.length; i++) {
+ if (confWhiteList[i].isEmpty()) {
+ LOG.info("Empty whitelisted URL skipped!");
+ continue;
+ }
+ whiteList.add(confWhiteList[i]);
+ }
+
+ if (whiteList.size() > 0) {
+ matcher = new SuffixStringMatcher(whiteList);
+ LOG.info("Whitelisted hosts: " + whiteList);
+ }
+ }
+ }
+
+ /**
+ * Get the {@link Configuration} object
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Check whether a URL belongs to a whitelisted host.
+ */
+ public boolean isWhiteListed(URL url) {
+ boolean match = false;
+ String urlString = url.getHost();
+
+ if (matcher != null) {
+ match = matcher.matches(urlString);
+ }
+
+ return match;
+ }
+
+ /**
+ * Parses the robots content using the {@link SimpleRobotRulesParser} from
+ * crawler commons
+ *
+ * @param url
+ * A string containing url
+ * @param content
+ * Contents of the robots file in a byte array
+ * @param contentType
+ * The content type of the robots file
+ * @param robotName
+ * A string containing all the robots agent names used by parser for
+ * matching
+ * @return BaseRobotRules object
+ */
+ public BaseRobotRules parseRules(String url, byte[] content,
+ String contentType, String robotName) {
+ return robotParser.parseContent(url, content, contentType, robotName);
+ }
+
+ public BaseRobotRules getRobotRulesSet(Protocol protocol, Text url) {
+ URL u = null;
+ try {
+ u = new URL(url.toString());
+ } catch (Exception e) {
+ return EMPTY_RULES;
+ }
+ return getRobotRulesSet(protocol, u);
+ }
+
+ /**
+ * Fetch robots.txt (or it's protocol-specific equivalent) which applies to
+ * the given URL, parse it and return the set of robot rules applicable for
+ * the configured agent name(s).
+ *
+ * @param protocol
+ * protocol implementation
+ * @param url
+ * URL to be checked whether fetching is allowed by robot rules
+ * @return robot rules
+ */
+ public abstract BaseRobotRules getRobotRulesSet(Protocol protocol, URL url);
+
+ @Override
+ public int run(String[] args) {
+
+ if (args.length < 2) {
+ String[] help = {
+ "Usage: RobotRulesParser <robots-file> <url-file> [<agent-names>]\n",
+ "\tThe <robots-file> will be parsed as a robots.txt file,",
+ "\tusing the given <agent-name> to select rules.",
+ "\tURLs will be read (one per line) from <url-file>,",
+ "\tand tested against the rules.",
+ "\tMultiple agent names can be provided using",
+ "\tcomma as a delimiter without any spaces.",
+ "\tIf no agent name is given the property http.agent.name",
+ "\tis used. If http.agent.name is empty, robots.txt is checked",
+ "\tfor rules assigned to the user agent `*' (meaning any other)." };
+ for (String s : help) {
+ System.err.println(s);
+ }
+ System.exit(-1);
+ }
+
+ File robotsFile = new File(args[0]);
+ File urlFile = new File(args[1]);
+
+ if (args.length > 2) {
+ // set agent name from command-line in configuration and update parser
+ String agents = args[2];
+ conf.set("http.agent.name", agents);
+ setConf(conf);
+ }
+
+ try {
+ BaseRobotRules rules = getRobotRulesSet(null, robotsFile.toURI().toURL());
+
+ LineNumberReader testsIn = new LineNumberReader(new FileReader(urlFile));
+ String testPath;
+ testPath = testsIn.readLine().trim();
+ while (testPath != null) {
+ try {
+ // testPath can be just a path or a complete URL
+ URL url = new URL(testPath);
+ String status;
+ if (isWhiteListed(url)) {
+ status = "whitelisted";
+ } else if (rules.isAllowed(testPath)) {
+ status = "allowed";
+ } else {
+ status = "not allowed";
+ }
+ System.out.println(status + ":\t" + testPath);
+ } catch (MalformedURLException e) {
+ }
+ testPath = testsIn.readLine();
+ }
+ testsIn.close();
+ } catch (IOException e) {
+ LOG.error("Failed to run: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+
+ return 0;
+ }
+
+ /**
+ * {@link RobotRulesParser} implementation which expects the location of the
+ * robots.txt passed by URL (usually pointing to a local file) in
+ * {@link getRobotRulesSet}.
+ */
+ private static class TestRobotRulesParser extends RobotRulesParser {
+
+ public TestRobotRulesParser(Configuration conf) {
+ // make sure that agent name is set so that setConf() does not complain,
+ // the agent name is later overwritten by command-line argument
+ if (conf.get("http.agent.name") == null) {
+ conf.set("http.agent.name", "*");
+ }
+ setConf(conf);
+ }
+
+ /**
+ * @param protocol (ignored)
+ * @param url
+ * location of the robots.txt file
+ * */
+ public BaseRobotRules getRobotRulesSet(Protocol protocol, URL url) {
+ BaseRobotRules rules;
+ try {
+ int contentLength = url.openConnection().getContentLength();
+ byte[] robotsBytes = new byte[contentLength];
+ InputStream openStream = url.openStream();
+ openStream.read(robotsBytes);
+ openStream.close();
+ rules = robotParser.parseContent(url.toString(), robotsBytes,
+ "text/plain", this.conf.get("http.agent.name"));
+ } catch (IOException e) {
+ LOG.error("Failed to open robots.txt file " + url
+ + StringUtils.stringifyException(e));
+ rules = EMPTY_RULES;
+ }
+ return rules;
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = NutchConfiguration.create();
+ int res = ToolRunner.run(conf, new TestRobotRulesParser(conf), args);
+ System.exit(res);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java b/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java
new file mode 100644
index 0000000..6685249
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/protocol/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Classes related to the {@link org.apache.nutch.protocol.Protocol Protocol} interface,
+ * see also {@link org.apache.nutch.net.protocols}.
+ */
+package org.apache.nutch.protocol;
+
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java b/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java
new file mode 100644
index 0000000..d74c7fb
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/AbstractScoringFilter.java
@@ -0,0 +1,68 @@
+package org.apache.nutch.scoring;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.scoring.ScoringFilter;
+import org.apache.nutch.scoring.ScoringFilterException;
+
+public abstract class AbstractScoringFilter implements ScoringFilter {
+
+ private Configuration conf;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public void injectedScore(Text url, CrawlDatum datum)
+ throws ScoringFilterException {
+ }
+
+ public void initialScore(Text url, CrawlDatum datum)
+ throws ScoringFilterException {
+ }
+
+ public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+ throws ScoringFilterException {
+ return initSort;
+ }
+
+ public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+ throws ScoringFilterException {
+ }
+
+ public void passScoreAfterParsing(Text url, Content content, Parse parse)
+ throws ScoringFilterException {
+ }
+
+ public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+ ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+ CrawlDatum adjust, int allCount) throws ScoringFilterException {
+ return adjust;
+ }
+
+ public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+ List<CrawlDatum> inlinked) throws ScoringFilterException {
+ }
+
+ @Override
+ public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
+ CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+ throws ScoringFilterException {
+ return initScore;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java
new file mode 100644
index 0000000..4061a75
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilter.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.plugin.Pluggable;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * A contract defining behavior of scoring plugins.
+ *
+ * A scoring filter will manipulate scoring variables in CrawlDatum and in
+ * resulting search indexes. Filters can be chained in a specific order, to
+ * provide multi-stage scoring adjustments.
+ *
+ * @author Andrzej Bialecki
+ */
+public interface ScoringFilter extends Configurable, Pluggable {
+ /** The name of the extension point. */
+ public final static String X_POINT_ID = ScoringFilter.class.getName();
+
+ /**
+ * Set an initial score for newly injected pages. Note: newly injected pages
+ * may have no inlinks, so filter implementations may wish to set this score
+ * to a non-zero value, to give newly injected pages some initial credit.
+ *
+ * @param url
+ * url of the page
+ * @param datum
+ * new datum. Filters will modify it in-place.
+ * @throws ScoringFilterException
+ */
+ public void injectedScore(Text url, CrawlDatum datum)
+ throws ScoringFilterException;
+
+ /**
+ * Set an initial score for newly discovered pages. Note: newly discovered
+ * pages have at least one inlink with its score contribution, so filter
+ * implementations may choose to set initial score to zero (unknown value),
+ * and then the inlink score contribution will set the "real" value of the new
+ * page.
+ *
+ * @param url
+ * url of the page
+ * @param datum
+ * new datum. Filters will modify it in-place.
+ * @throws ScoringFilterException
+ */
+ public void initialScore(Text url, CrawlDatum datum)
+ throws ScoringFilterException;
+
+ /**
+ * This method prepares a sort value for the purpose of sorting and selecting
+ * top N scoring pages during fetchlist generation.
+ *
+ * @param url
+ * url of the page
+ * @param datum
+ * page's datum, should not be modified
+ * @param initSort
+ * initial sort value, or a value from previous filters in chain
+ */
+ public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+ throws ScoringFilterException;
+
+ /**
+ * This method takes all relevant score information from the current datum
+ * (coming from a generated fetchlist) and stores it into
+ * {@link org.apache.nutch.protocol.Content} metadata. This is needed in order
+ * to pass this value(s) to the mechanism that distributes it to outlinked
+ * pages.
+ *
+ * @param url
+ * url of the page
+ * @param datum
+ * source datum. NOTE: modifications to this value are not persisted.
+ * @param content
+ * instance of content. Implementations may modify this in-place,
+ * primarily by setting some metadata properties.
+ */
+ public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+ throws ScoringFilterException;
+
+ /**
+ * Currently a part of score distribution is performed using only data coming
+ * from the parsing process. We need this method in order to ensure the
+ * presence of score data in these steps.
+ *
+ * @param url
+ * page url
+ * @param content
+ * original content. NOTE: modifications to this value are not
+ * persisted.
+ * @param parse
+ * target instance to copy the score information to. Implementations
+ * may modify this in-place, primarily by setting some metadata
+ * properties.
+ */
+ public void passScoreAfterParsing(Text url, Content content, Parse parse)
+ throws ScoringFilterException;
+
+ /**
+ * Distribute score value from the current page to all its outlinked pages.
+ *
+ * @param fromUrl
+ * url of the source page
+ * @param parseData
+ * ParseData instance, which stores relevant score value(s) in its
+ * metadata. NOTE: filters may modify this in-place, all changes will
+ * be persisted.
+ * @param targets
+ * <url, CrawlDatum> pairs. NOTE: filters can modify this
+ * in-place, all changes will be persisted.
+ * @param adjust
+ * a CrawlDatum instance, initially null, which implementations may
+ * use to pass adjustment values to the original CrawlDatum. When
+ * creating this instance, set its status to
+ * {@link CrawlDatum#STATUS_LINKED}.
+ * @param allCount
+ * number of all collected outlinks from the source page
+ * @return if needed, implementations may return an instance of CrawlDatum,
+ * with status {@link CrawlDatum#STATUS_LINKED}, which contains
+ * adjustments to be applied to the original CrawlDatum score(s) and
+ * metadata. This can be null if not needed.
+ * @throws ScoringFilterException
+ */
+ public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+ ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+ CrawlDatum adjust, int allCount) throws ScoringFilterException;
+
+ /**
+ * This method calculates a new score of CrawlDatum during CrawlDb update,
+ * based on the initial value of the original CrawlDatum, and also score
+ * values contributed by inlinked pages.
+ *
+ * @param url
+ * url of the page
+ * @param old
+ * original datum, with original score. May be null if this is a
+ * newly discovered page. If not null, filters should use score
+ * values from this parameter as the starting values - the
+ * <code>datum</code> parameter may contain values that are no longer
+ * valid, if other updates occured between generation and this
+ * update.
+ * @param datum
+ * the new datum, with the original score saved at the time when
+ * fetchlist was generated. Filters should update this in-place, and
+ * it will be saved in the crawldb.
+ * @param inlinked
+ * (partial) list of CrawlDatum-s (with their scores) from links
+ * pointing to this page, found in the current update batch.
+ * @throws ScoringFilterException
+ */
+ public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+ List<CrawlDatum> inlinked) throws ScoringFilterException;
+
+ /**
+ * This method calculates a Lucene document boost.
+ *
+ * @param url
+ * url of the page
+ * @param doc
+ * Lucene document. NOTE: this already contains all information
+ * collected by indexing filters. Implementations may modify this
+ * instance, in order to store/remove some information.
+ * @param dbDatum
+ * current page from CrawlDb. NOTE: changes made to this instance are
+ * not persisted.
+ * @param fetchDatum
+ * datum from FetcherOutput (containing among others the fetching
+ * status)
+ * @param parse
+ * parsing result. NOTE: changes made to this instance are not
+ * persisted.
+ * @param inlinks
+ * current inlinks from LinkDb. NOTE: changes made to this instance
+ * are not persisted.
+ * @param initScore
+ * initial boost value for the Lucene document.
+ * @return boost value for the Lucene document. This value is passed as an
+ * argument to the next scoring filter in chain. NOTE: implementations
+ * may also express other scoring strategies by modifying Lucene
+ * document directly.
+ * @throws ScoringFilterException
+ */
+ public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
+ CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+ throws ScoringFilterException;
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java
new file mode 100644
index 0000000..f363c4b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilterException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring;
+
+/**
+ * Specialized exception for errors during scoring.
+ *
+ * @author Andrzej Bialecki
+ */
+@SuppressWarnings("serial")
+public class ScoringFilterException extends Exception {
+
+ public ScoringFilterException() {
+ super();
+ }
+
+ public ScoringFilterException(String message) {
+ super(message);
+ }
+
+ public ScoringFilterException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ScoringFilterException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java
new file mode 100644
index 0000000..5bad78f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/ScoringFilters.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.scoring;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Creates and caches {@link ScoringFilter} implementing plugins.
+ *
+ * @author Andrzej Bialecki
+ */
+public class ScoringFilters extends Configured implements ScoringFilter {
+
+ private ScoringFilter[] filters;
+
+ public ScoringFilters(Configuration conf) {
+ super(conf);
+ this.filters = (ScoringFilter[]) PluginRepository.get(conf)
+ .getOrderedPlugins(ScoringFilter.class, ScoringFilter.X_POINT_ID,
+ "scoring.filter.order");
+ }
+
+ /** Calculate a sort value for Generate. */
+ public float generatorSortValue(Text url, CrawlDatum datum, float initSort)
+ throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ initSort = this.filters[i].generatorSortValue(url, datum, initSort);
+ }
+ return initSort;
+ }
+
+ /** Calculate a new initial score, used when adding newly discovered pages. */
+ public void initialScore(Text url, CrawlDatum datum)
+ throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ this.filters[i].initialScore(url, datum);
+ }
+ }
+
+ /** Calculate a new initial score, used when injecting new pages. */
+ public void injectedScore(Text url, CrawlDatum datum)
+ throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ this.filters[i].injectedScore(url, datum);
+ }
+ }
+
+ /** Calculate updated page score during CrawlDb.update(). */
+ public void updateDbScore(Text url, CrawlDatum old, CrawlDatum datum,
+ List<CrawlDatum> inlinked) throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ this.filters[i].updateDbScore(url, old, datum, inlinked);
+ }
+ }
+
+ public void passScoreBeforeParsing(Text url, CrawlDatum datum, Content content)
+ throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ this.filters[i].passScoreBeforeParsing(url, datum, content);
+ }
+ }
+
+ public void passScoreAfterParsing(Text url, Content content, Parse parse)
+ throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ this.filters[i].passScoreAfterParsing(url, content, parse);
+ }
+ }
+
+ public CrawlDatum distributeScoreToOutlinks(Text fromUrl,
+ ParseData parseData, Collection<Entry<Text, CrawlDatum>> targets,
+ CrawlDatum adjust, int allCount) throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ adjust = this.filters[i].distributeScoreToOutlinks(fromUrl, parseData,
+ targets, adjust, allCount);
+ }
+ return adjust;
+ }
+
+ public float indexerScore(Text url, NutchDocument doc, CrawlDatum dbDatum,
+ CrawlDatum fetchDatum, Parse parse, Inlinks inlinks, float initScore)
+ throws ScoringFilterException {
+ for (int i = 0; i < this.filters.length; i++) {
+ initScore = this.filters[i].indexerScore(url, doc, dbDatum, fetchDatum,
+ parse, inlinks, initScore);
+ }
+ return initScore;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java b/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java
new file mode 100644
index 0000000..b6a578b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The {@link org.apache.nutch.scoring.ScoringFilter ScoringFilter} interface.
+ */
+package org.apache.nutch.scoring;
+
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
new file mode 100644
index 0000000..67c9366
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDatum.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A class for holding link information including the url, anchor text, a score,
+ * the timestamp of the link and a link type.
+ */
+public class LinkDatum implements Writable {
+
+ public final static byte INLINK = 1;
+ public final static byte OUTLINK = 2;
+
+ private String url = null;
+ private String anchor = "";
+ private float score = 0.0f;
+ private long timestamp = 0L;
+ private byte linkType = 0;
+
+ /**
+ * Default constructor, no url, timestamp, score, or link type.
+ */
+ public LinkDatum() {
+
+ }
+
+ /**
+ * Creates a LinkDatum with a given url. Timestamp is set to current time.
+ *
+ * @param url
+ * The link url.
+ */
+ public LinkDatum(String url) {
+ this(url, "", System.currentTimeMillis());
+ }
+
+ /**
+ * Creates a LinkDatum with a url and an anchor text. Timestamp is set to
+ * current time.
+ *
+ * @param url
+ * The link url.
+ * @param anchor
+ * The link anchor text.
+ */
+ public LinkDatum(String url, String anchor) {
+ this(url, anchor, System.currentTimeMillis());
+ }
+
+ public LinkDatum(String url, String anchor, long timestamp) {
+ this.url = url;
+ this.anchor = anchor;
+ this.timestamp = timestamp;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public String getAnchor() {
+ return anchor;
+ }
+
+ public void setAnchor(String anchor) {
+ this.anchor = anchor;
+ }
+
+ public float getScore() {
+ return score;
+ }
+
+ public void setScore(float score) {
+ this.score = score;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public byte getLinkType() {
+ return linkType;
+ }
+
+ public void setLinkType(byte linkType) {
+ this.linkType = linkType;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ url = Text.readString(in);
+ anchor = Text.readString(in);
+ score = in.readFloat();
+ timestamp = in.readLong();
+ linkType = in.readByte();
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, url);
+ Text.writeString(out, anchor != null ? anchor : "");
+ out.writeFloat(score);
+ out.writeLong(timestamp);
+ out.writeByte(linkType);
+ }
+
+ public String toString() {
+
+ String type = (linkType == INLINK ? "inlink"
+ : (linkType == OUTLINK) ? "outlink" : "unknown");
+ return "url: " + url + ", anchor: " + anchor + ", score: " + score
+ + ", timestamp: " + timestamp + ", link type: " + type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
new file mode 100644
index 0000000..1569c4d
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkDumper.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * The LinkDumper tool creates a database of node to inlink information that can
+ * be read using the nested Reader class. This allows the inlink and scoring
+ * state of a single url to be reviewed quickly to determine why a given url is
+ * ranking a certain way. This tool is to be used with the LinkRank analysis.
+ */
+public class LinkDumper extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(LinkDumper.class);
+ public static final String DUMP_DIR = "linkdump";
+
+ /**
+ * Reader class which will print out the url and all of its inlinks to system
+ * out. Each inlinkwill be displayed with its node information including score
+ * and number of in and outlinks.
+ */
+ public static class Reader {
+
+ public static void main(String[] args) throws Exception {
+
+ if (args == null || args.length < 2) {
+ System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>");
+ return;
+ }
+
+ // open the readers for the linkdump directory
+ Configuration conf = NutchConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+ Path webGraphDb = new Path(args[0]);
+ String url = args[1];
+ MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(
+ webGraphDb, DUMP_DIR), conf);
+
+ // get the link nodes for the url
+ Text key = new Text(url);
+ LinkNodes nodes = new LinkNodes();
+ MapFileOutputFormat.getEntry(readers,
+ new HashPartitioner<Text, LinkNodes>(), key, nodes);
+
+ // print out the link nodes
+ LinkNode[] linkNodesAr = nodes.getLinks();
+ System.out.println(url + ":");
+ for (LinkNode node : linkNodesAr) {
+ System.out.println(" " + node.getUrl() + " - "
+ + node.getNode().toString());
+ }
+
+ // close the readers
+ FSUtils.closeReaders(readers);
+ }
+ }
+
+ /**
+ * Bean class which holds url to node information.
+ */
+ public static class LinkNode implements Writable {
+
+ private String url = null;
+ private Node node = null;
+
+ public LinkNode() {
+
+ }
+
+ public LinkNode(String url, Node node) {
+ this.url = url;
+ this.node = node;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public void setNode(Node node) {
+ this.node = node;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ url = in.readUTF();
+ node = new Node();
+ node.readFields(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(url);
+ node.write(out);
+ }
+
+ }
+
+ /**
+ * Writable class which holds an array of LinkNode objects.
+ */
+ public static class LinkNodes implements Writable {
+
+ private LinkNode[] links;
+
+ public LinkNodes() {
+
+ }
+
+ public LinkNodes(LinkNode[] links) {
+ this.links = links;
+ }
+
+ public LinkNode[] getLinks() {
+ return links;
+ }
+
+ public void setLinks(LinkNode[] links) {
+ this.links = links;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int numLinks = in.readInt();
+ if (numLinks > 0) {
+ links = new LinkNode[numLinks];
+ for (int i = 0; i < numLinks; i++) {
+ LinkNode node = new LinkNode();
+ node.readFields(in);
+ links[i] = node;
+ }
+ }
+ }
+
+ public void write(DataOutput out) throws IOException {
+ if (links != null && links.length > 0) {
+ int numLinks = links.length;
+ out.writeInt(numLinks);
+ for (int i = 0; i < numLinks; i++) {
+ links[i].write(out);
+ }
+ }
+ }
+ }
+
+ /**
+ * Inverts outlinks from the WebGraph to inlinks and attaches node
+ * information.
+ */
+ public static class Inverter implements
+ Mapper<Text, Writable, Text, ObjectWritable>,
+ Reducer<Text, ObjectWritable, Text, LinkNode> {
+
+ private JobConf conf;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Wraps all values in ObjectWritables.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Inverts outlinks to inlinks while attaching node information to the
+ * outlink.
+ */
+ public void reduce(Text key, Iterator<ObjectWritable> values,
+ OutputCollector<Text, LinkNode> output, Reporter reporter)
+ throws IOException {
+
+ String fromUrl = key.toString();
+ List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
+ Node node = null;
+
+ // loop through all values aggregating outlinks, saving node
+ while (values.hasNext()) {
+ ObjectWritable write = values.next();
+ Object obj = write.get();
+ if (obj instanceof Node) {
+ node = (Node) obj;
+ } else if (obj instanceof LinkDatum) {
+ outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
+ }
+ }
+
+ // only collect if there are outlinks
+ int numOutlinks = node.getNumOutlinks();
+ if (numOutlinks > 0) {
+ for (int i = 0; i < outlinks.size(); i++) {
+ LinkDatum outlink = outlinks.get(i);
+ String toUrl = outlink.getUrl();
+
+ // collect the outlink as an inlink with the node
+ output.collect(new Text(toUrl), new LinkNode(fromUrl, node));
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Merges LinkNode objects into a single array value per url. This allows all
+ * values to be quickly retrieved and printed via the Reader tool.
+ */
+ public static class Merger implements
+ Reducer<Text, LinkNode, Text, LinkNodes> {
+
+ private JobConf conf;
+ private int maxInlinks = 50000;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Aggregate all LinkNode objects for a given url.
+ */
+ public void reduce(Text key, Iterator<LinkNode> values,
+ OutputCollector<Text, LinkNodes> output, Reporter reporter)
+ throws IOException {
+
+ List<LinkNode> nodeList = new ArrayList<LinkNode>();
+ int numNodes = 0;
+
+ while (values.hasNext()) {
+ LinkNode cur = values.next();
+ if (numNodes < maxInlinks) {
+ nodeList.add(WritableUtils.clone(cur, conf));
+ numNodes++;
+ } else {
+ break;
+ }
+ }
+
+ LinkNode[] linkNodesAr = nodeList.toArray(new LinkNode[nodeList.size()]);
+ LinkNodes linkNodes = new LinkNodes(linkNodesAr);
+ output.collect(key, linkNodes);
+ }
+
+ public void close() {
+
+ }
+ }
+
+ /**
+ * Runs the inverter and merger jobs of the LinkDumper tool to create the url
+ * to inlink node database.
+ */
+ public void dumpLinks(Path webGraphDb) throws IOException {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("NodeDumper: starting at " + sdf.format(start));
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ Path linkdump = new Path(webGraphDb, DUMP_DIR);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path outlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+
+ // run the inverter job
+ Path tempInverted = new Path(webGraphDb, "inverted-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ JobConf inverter = new NutchJob(conf);
+ inverter.setJobName("LinkDumper: inverter");
+ FileInputFormat.addInputPath(inverter, nodeDb);
+ FileInputFormat.addInputPath(inverter, outlinkDb);
+ inverter.setInputFormat(SequenceFileInputFormat.class);
+ inverter.setMapperClass(Inverter.class);
+ inverter.setReducerClass(Inverter.class);
+ inverter.setMapOutputKeyClass(Text.class);
+ inverter.setMapOutputValueClass(ObjectWritable.class);
+ inverter.setOutputKeyClass(Text.class);
+ inverter.setOutputValueClass(LinkNode.class);
+ FileOutputFormat.setOutputPath(inverter, tempInverted);
+ inverter.setOutputFormat(SequenceFileOutputFormat.class);
+
+ try {
+ LOG.info("LinkDumper: running inverter");
+ JobClient.runJob(inverter);
+ LOG.info("LinkDumper: finished inverter");
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // run the merger job
+ JobConf merger = new NutchJob(conf);
+ merger.setJobName("LinkDumper: merger");
+ FileInputFormat.addInputPath(merger, tempInverted);
+ merger.setInputFormat(SequenceFileInputFormat.class);
+ merger.setReducerClass(Merger.class);
+ merger.setMapOutputKeyClass(Text.class);
+ merger.setMapOutputValueClass(LinkNode.class);
+ merger.setOutputKeyClass(Text.class);
+ merger.setOutputValueClass(LinkNodes.class);
+ FileOutputFormat.setOutputPath(merger, linkdump);
+ merger.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ LOG.info("LinkDumper: running merger");
+ JobClient.runJob(merger);
+ LOG.info("LinkDumper: finished merger");
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ fs.delete(tempInverted, true);
+ long end = System.currentTimeMillis();
+ LOG.info("LinkDumper: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDumper(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the LinkDumper tool. This simply creates the database, to read the
+ * values the nested Reader tool must be used.
+ */
+ public int run(String[] args) throws Exception {
+
+ Options options = new Options();
+ OptionBuilder.withArgName("help");
+ OptionBuilder.withDescription("show this help message");
+ Option helpOpts = OptionBuilder.create("help");
+ options.addOption(helpOpts);
+
+ OptionBuilder.withArgName("webgraphdb");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the web graph database to use");
+ Option webGraphDbOpts = OptionBuilder.create("webgraphdb");
+ options.addOption(webGraphDbOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("LinkDumper", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ dumpLinks(new Path(webGraphDb));
+ return 0;
+ } catch (Exception e) {
+ LOG.error("LinkDumper: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java
new file mode 100644
index 0000000..bd22828
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/LinkRank.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+public class LinkRank extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(LinkRank.class);
+ private static final String NUM_NODES = "_num_nodes_";
+
+ /**
+ * Runs the counter job. The counter job determines the number of links in the
+ * webgraph. This is used during analysis.
+ *
+ * @param fs
+ * The job file system.
+ * @param webGraphDb
+ * The web graph database to use.
+ *
+ * @return The number of nodes in the web graph.
+ * @throws IOException
+ * If an error occurs while running the counter job.
+ */
+ private int runCounter(FileSystem fs, Path webGraphDb) throws IOException {
+
+ // configure the counter job
+ Path numLinksPath = new Path(webGraphDb, NUM_NODES);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ JobConf counter = new NutchJob(getConf());
+ counter.setJobName("LinkRank Counter");
+ FileInputFormat.addInputPath(counter, nodeDb);
+ FileOutputFormat.setOutputPath(counter, numLinksPath);
+ counter.setInputFormat(SequenceFileInputFormat.class);
+ counter.setMapperClass(Counter.class);
+ counter.setCombinerClass(Counter.class);
+ counter.setReducerClass(Counter.class);
+ counter.setMapOutputKeyClass(Text.class);
+ counter.setMapOutputValueClass(LongWritable.class);
+ counter.setOutputKeyClass(Text.class);
+ counter.setOutputValueClass(LongWritable.class);
+ counter.setNumReduceTasks(1);
+ counter.setOutputFormat(TextOutputFormat.class);
+ counter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ // run the counter job, outputs to a single reduce task and file
+ LOG.info("Starting link counter job");
+ try {
+ JobClient.runJob(counter);
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished link counter job");
+
+ // read the first (and only) line from the file which should be the
+ // number of links in the web graph
+ LOG.info("Reading numlinks temp file");
+ FSDataInputStream readLinks = fs.open(new Path(numLinksPath, "part-00000"));
+ BufferedReader buffer = new BufferedReader(new InputStreamReader(readLinks));
+ String numLinksLine = buffer.readLine();
+ readLinks.close();
+
+ // check if there are links to process, if none, webgraph might be empty
+ if (numLinksLine == null || numLinksLine.length() == 0) {
+ fs.delete(numLinksPath, true);
+ throw new IOException("No links to process, is the webgraph empty?");
+ }
+
+ // delete temp file and convert and return the number of links as an int
+ LOG.info("Deleting numlinks temp file");
+ fs.delete(numLinksPath, true);
+ String numLinks = numLinksLine.split("\\s+")[1];
+ return Integer.parseInt(numLinks);
+ }
+
+ /**
+ * Runs the initializer job. The initializer job sets up the nodes with a
+ * default starting score for link analysis.
+ *
+ * @param nodeDb
+ * The node database to use.
+ * @param output
+ * The job output directory.
+ *
+ * @throws IOException
+ * If an error occurs while running the initializer job.
+ */
+ private void runInitializer(Path nodeDb, Path output) throws IOException {
+
+ // configure the initializer
+ JobConf initializer = new NutchJob(getConf());
+ initializer.setJobName("LinkAnalysis Initializer");
+ FileInputFormat.addInputPath(initializer, nodeDb);
+ FileOutputFormat.setOutputPath(initializer, output);
+ initializer.setInputFormat(SequenceFileInputFormat.class);
+ initializer.setMapperClass(Initializer.class);
+ initializer.setMapOutputKeyClass(Text.class);
+ initializer.setMapOutputValueClass(Node.class);
+ initializer.setOutputKeyClass(Text.class);
+ initializer.setOutputValueClass(Node.class);
+ initializer.setOutputFormat(MapFileOutputFormat.class);
+ initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ // run the initializer
+ LOG.info("Starting initialization job");
+ try {
+ JobClient.runJob(initializer);
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished initialization job.");
+ }
+
+ /**
+ * Runs the inverter job. The inverter job flips outlinks to inlinks to be
+ * passed into the analysis job.
+ *
+ * @param nodeDb
+ * The node database to use.
+ * @param outlinkDb
+ * The outlink database to use.
+ * @param output
+ * The output directory.
+ *
+ * @throws IOException
+ * If an error occurs while running the inverter job.
+ */
+ private void runInverter(Path nodeDb, Path outlinkDb, Path output)
+ throws IOException {
+
+ // configure the inverter
+ JobConf inverter = new NutchJob(getConf());
+ inverter.setJobName("LinkAnalysis Inverter");
+ FileInputFormat.addInputPath(inverter, nodeDb);
+ FileInputFormat.addInputPath(inverter, outlinkDb);
+ FileOutputFormat.setOutputPath(inverter, output);
+ inverter.setInputFormat(SequenceFileInputFormat.class);
+ inverter.setMapperClass(Inverter.class);
+ inverter.setReducerClass(Inverter.class);
+ inverter.setMapOutputKeyClass(Text.class);
+ inverter.setMapOutputValueClass(ObjectWritable.class);
+ inverter.setOutputKeyClass(Text.class);
+ inverter.setOutputValueClass(LinkDatum.class);
+ inverter.setOutputFormat(SequenceFileOutputFormat.class);
+ inverter.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ // run the inverter job
+ LOG.info("Starting inverter job");
+ try {
+ JobClient.runJob(inverter);
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished inverter job.");
+ }
+
+ /**
+ * Runs the link analysis job. The link analysis job applies the link rank
+ * formula to create a score per url and stores that score in the NodeDb.
+ *
+ * Typically the link analysis job is run a number of times to allow the link
+ * rank scores to converge.
+ *
+ * @param nodeDb
+ * The node database from which we are getting previous link rank
+ * scores.
+ * @param inverted
+ * The inverted inlinks
+ * @param output
+ * The link analysis output.
+ * @param iteration
+ * The current iteration number.
+ * @param numIterations
+ * The total number of link analysis iterations
+ *
+ * @throws IOException
+ * If an error occurs during link analysis.
+ */
+ private void runAnalysis(Path nodeDb, Path inverted, Path output,
+ int iteration, int numIterations, float rankOne) throws IOException {
+
+ JobConf analyzer = new NutchJob(getConf());
+ analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1));
+ analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1)
+ + " of " + numIterations);
+ FileInputFormat.addInputPath(analyzer, nodeDb);
+ FileInputFormat.addInputPath(analyzer, inverted);
+ FileOutputFormat.setOutputPath(analyzer, output);
+ analyzer.set("link.analyze.rank.one", String.valueOf(rankOne));
+ analyzer.setMapOutputKeyClass(Text.class);
+ analyzer.setMapOutputValueClass(ObjectWritable.class);
+ analyzer.setInputFormat(SequenceFileInputFormat.class);
+ analyzer.setMapperClass(Analyzer.class);
+ analyzer.setReducerClass(Analyzer.class);
+ analyzer.setOutputKeyClass(Text.class);
+ analyzer.setOutputValueClass(Node.class);
+ analyzer.setOutputFormat(MapFileOutputFormat.class);
+ analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ LOG.info("Starting analysis job");
+ try {
+ JobClient.runJob(analyzer);
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ LOG.info("Finished analysis job.");
+ }
+
+ /**
+ * The Counter job that determines the total number of nodes in the WebGraph.
+ * This is used to determine a rank one score for pages with zero inlinks but
+ * that contain outlinks.
+ */
+ private static class Counter implements
+ Mapper<Text, Node, Text, LongWritable>,
+ Reducer<Text, LongWritable, Text, LongWritable> {
+
+ private static Text numNodes = new Text(NUM_NODES);
+ private static LongWritable one = new LongWritable(1L);
+
+ public void configure(JobConf conf) {
+ }
+
+ /**
+ * Outputs one for every node.
+ */
+ public void map(Text key, Node value,
+ OutputCollector<Text, LongWritable> output, Reporter reporter)
+ throws IOException {
+ output.collect(numNodes, one);
+ }
+
+ /**
+ * Totals the node number and outputs a single total value.
+ */
+ public void reduce(Text key, Iterator<LongWritable> values,
+ OutputCollector<Text, LongWritable> output, Reporter reporter)
+ throws IOException {
+
+ long total = 0;
+ while (values.hasNext()) {
+ total += values.next().get();
+ }
+ output.collect(numNodes, new LongWritable(total));
+ }
+
+ public void close() {
+ }
+ }
+
+ private static class Initializer implements Mapper<Text, Node, Text, Node> {
+
+ private JobConf conf;
+ private float initialScore = 1.0f;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ initialScore = conf.getFloat("link.analyze.initial.score", 1.0f);
+ }
+
+ public void map(Text key, Node node, OutputCollector<Text, Node> output,
+ Reporter reporter) throws IOException {
+
+ String url = key.toString();
+ Node outNode = WritableUtils.clone(node, conf);
+ outNode.setInlinkScore(initialScore);
+
+ output.collect(new Text(url), outNode);
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Inverts outlinks and attaches current score from the NodeDb of the
+ * WebGraph. The link analysis process consists of inverting, analyzing and
+ * scoring, in a loop for a given number of iterations.
+ */
+ private static class Inverter implements
+ Mapper<Text, Writable, Text, ObjectWritable>,
+ Reducer<Text, ObjectWritable, Text, LinkDatum> {
+
+ private JobConf conf;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Convert values to ObjectWritable
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Inverts outlinks to inlinks, attaches current score for the outlink from
+ * the NodeDb of the WebGraph.
+ */
+ public void reduce(Text key, Iterator<ObjectWritable> values,
+ OutputCollector<Text, LinkDatum> output, Reporter reporter)
+ throws IOException {
+
+ String fromUrl = key.toString();
+ List<LinkDatum> outlinks = new ArrayList<LinkDatum>();
+ Node node = null;
+
+ // aggregate outlinks, assign other values
+ while (values.hasNext()) {
+ ObjectWritable write = values.next();
+ Object obj = write.get();
+ if (obj instanceof Node) {
+ node = (Node) obj;
+ } else if (obj instanceof LinkDatum) {
+ outlinks.add(WritableUtils.clone((LinkDatum) obj, conf));
+ }
+ }
+
+ // get the number of outlinks and the current inlink and outlink scores
+ // from the node of the url
+ int numOutlinks = node.getNumOutlinks();
+ float inlinkScore = node.getInlinkScore();
+ float outlinkScore = node.getOutlinkScore();
+ LOG.debug(fromUrl + ": num outlinks " + numOutlinks);
+
+ // can't invert if no outlinks
+ if (numOutlinks > 0) {
+ for (int i = 0; i < outlinks.size(); i++) {
+ LinkDatum outlink = outlinks.get(i);
+ String toUrl = outlink.getUrl();
+
+ outlink.setUrl(fromUrl);
+ outlink.setScore(outlinkScore);
+
+ // collect the inverted outlink
+ output.collect(new Text(toUrl), outlink);
+ LOG.debug(toUrl + ": inverting inlink from " + fromUrl
+ + " origscore: " + inlinkScore + " numOutlinks: " + numOutlinks
+ + " inlinkscore: " + outlinkScore);
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * Runs a single link analysis iteration.
+ */
+ private static class Analyzer implements
+ Mapper<Text, Writable, Text, ObjectWritable>,
+ Reducer<Text, ObjectWritable, Text, Node> {
+
+ private JobConf conf;
+ private float dampingFactor = 0.85f;
+ private float rankOne = 0.0f;
+ private int itNum = 0;
+ private boolean limitPages = true;
+ private boolean limitDomains = true;
+
+ /**
+ * Configures the job, sets the damping factor, rank one score, and other
+ * needed values for analysis.
+ */
+ public void configure(JobConf conf) {
+
+ try {
+ this.conf = conf;
+ this.dampingFactor = conf
+ .getFloat("link.analyze.damping.factor", 0.85f);
+ this.rankOne = conf.getFloat("link.analyze.rank.one", 0.0f);
+ this.itNum = conf.getInt("link.analyze.iteration", 0);
+ limitPages = conf.getBoolean("link.ignore.limit.page", true);
+ limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Convert values to ObjectWritable
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(WritableUtils.clone(value, conf));
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Performs a single iteration of link analysis. The resulting scores are
+ * stored in a temporary NodeDb which replaces the NodeDb of the WebGraph.
+ */
+ public void reduce(Text key, Iterator<ObjectWritable> values,
+ OutputCollector<Text, Node> output, Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Set<String> domains = new HashSet<String>();
+ Set<String> pages = new HashSet<String>();
+ Node node = null;
+
+ // a page with zero inlinks has a score of rankOne
+ int numInlinks = 0;
+ float totalInlinkScore = rankOne;
+
+ while (values.hasNext()) {
+
+ ObjectWritable next = values.next();
+ Object value = next.get();
+ if (value instanceof Node) {
+ node = (Node) value;
+ } else if (value instanceof LinkDatum) {
+
+ LinkDatum linkDatum = (LinkDatum) value;
+ float scoreFromInlink = linkDatum.getScore();
+ String inlinkUrl = linkDatum.getUrl();
+ String inLinkDomain = URLUtil.getDomainName(inlinkUrl);
+ String inLinkPage = URLUtil.getPage(inlinkUrl);
+
+ // limit counting duplicate inlinks by pages or domains
+ if ((limitPages && pages.contains(inLinkPage))
+ || (limitDomains && domains.contains(inLinkDomain))) {
+ LOG.debug(url + ": ignoring " + scoreFromInlink + " from "
+ + inlinkUrl + ", duplicate page or domain");
+ continue;
+ }
+
+ // aggregate total inlink score
+ numInlinks++;
+ totalInlinkScore += scoreFromInlink;
+ domains.add(inLinkDomain);
+ pages.add(inLinkPage);
+ LOG.debug(url + ": adding " + scoreFromInlink + " from " + inlinkUrl
+ + ", total: " + totalInlinkScore);
+ }
+ }
+
+ // calculate linkRank score formula
+ float linkRankScore = (1 - this.dampingFactor)
+ + (this.dampingFactor * totalInlinkScore);
+
+ LOG.debug(url + ": score: " + linkRankScore + " num inlinks: "
+ + numInlinks + " iteration: " + itNum);
+
+ // store the score in a temporary NodeDb
+ Node outNode = WritableUtils.clone(node, conf);
+ outNode.setInlinkScore(linkRankScore);
+ output.collect(key, outNode);
+ }
+
+ public void close() throws IOException {
+ }
+ }
+
+ /**
+ * Default constructor.
+ */
+ public LinkRank() {
+ super();
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public LinkRank(Configuration conf) {
+ super(conf);
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Runs the complete link analysis job. The complete job determins rank one
+ * score. Then runs through a given number of invert and analyze iterations,
+ * by default 10. And finally replaces the NodeDb in the WebGraph with the
+ * link rank output.
+ *
+ * @param webGraphDb
+ * The WebGraph to run link analysis on.
+ *
+ * @throws IOException
+ * If an error occurs during link analysis.
+ */
+ public void analyze(Path webGraphDb) throws IOException {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("Analysis: starting at " + sdf.format(start));
+
+ // store the link rank under the webgraphdb temporarily, final scores get
+ // upddated into the nodedb
+ Path linkRank = new Path(webGraphDb, "linkrank");
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // create the linkrank directory if needed
+ if (!fs.exists(linkRank)) {
+ fs.mkdirs(linkRank);
+ }
+
+ // the webgraph outlink and node database paths
+ Path wgOutlinkDb = new Path(webGraphDb, WebGraph.OUTLINK_DIR);
+ Path wgNodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path nodeDb = new Path(linkRank, WebGraph.NODE_DIR);
+
+ // get the number of total nodes in the webgraph, used for rank one, then
+ // initialze all urls with a default score
+ int numLinks = runCounter(fs, webGraphDb);
+ runInitializer(wgNodeDb, nodeDb);
+ float rankOneScore = (1f / (float) numLinks);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Analysis: Number of links: " + numLinks);
+ LOG.info("Analysis: Rank One: " + rankOneScore);
+ }
+
+ // run invert and analysis for a given number of iterations to allow the
+ // link rank scores to converge
+ int numIterations = conf.getInt("link.analyze.num.iterations", 10);
+ for (int i = 0; i < numIterations; i++) {
+
+ // the input to inverting is always the previous output from analysis
+ LOG.info("Analysis: Starting iteration " + (i + 1) + " of "
+ + numIterations);
+ Path tempRank = new Path(linkRank + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ fs.mkdirs(tempRank);
+ Path tempInverted = new Path(tempRank, "inverted");
+ Path tempNodeDb = new Path(tempRank, WebGraph.NODE_DIR);
+
+ // run invert and analysis
+ runInverter(nodeDb, wgOutlinkDb, tempInverted);
+ runAnalysis(nodeDb, tempInverted, tempNodeDb, i, numIterations,
+ rankOneScore);
+
+ // replace the temporary NodeDb with the output from analysis
+ LOG.info("Analysis: Installing new link scores");
+ FSUtils.replace(fs, linkRank, tempRank, true);
+ LOG.info("Analysis: finished iteration " + (i + 1) + " of "
+ + numIterations);
+ }
+
+ // replace the NodeDb in the WebGraph with the final output of analysis
+ LOG.info("Analysis: Installing web graph nodes");
+ FSUtils.replace(fs, wgNodeDb, nodeDb, true);
+
+ // remove the temporary link rank folder
+ fs.delete(linkRank, true);
+ long end = System.currentTimeMillis();
+ LOG.info("Analysis: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkRank(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the LinkRank tool.
+ */
+ public int run(String[] args) throws Exception {
+
+ Options options = new Options();
+ OptionBuilder.withArgName("help");
+ OptionBuilder.withDescription("show this help message");
+ Option helpOpts = OptionBuilder.create("help");
+ options.addOption(helpOpts);
+
+ OptionBuilder.withArgName("webgraphdb");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the web graph db to use");
+ Option webgraphOpts = OptionBuilder.create("webgraphdb");
+ options.addOption(webgraphOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("LinkRank", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+
+ analyze(new Path(webGraphDb));
+ return 0;
+ } catch (Exception e) {
+ LOG.error("LinkAnalysis: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}