You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:55 UTC
[39/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java
new file mode 100644
index 0000000..a35e842
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/Node.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.metadata.Metadata;
+
+/**
+ * A class which holds the number of inlinks and outlinks for a given url along
+ * with an inlink score from a link analysis program and any metadata.
+ *
+ * The Node is the core unit of the NodeDb in the WebGraph.
+ */
+public class Node implements Writable {
+
+ private int numInlinks = 0;
+ private int numOutlinks = 0;
+ private float inlinkScore = 1.0f;
+ private Metadata metadata = new Metadata();
+
+ public Node() {
+
+ }
+
+ public int getNumInlinks() {
+ return numInlinks;
+ }
+
+ public void setNumInlinks(int numInlinks) {
+ this.numInlinks = numInlinks;
+ }
+
+ public int getNumOutlinks() {
+ return numOutlinks;
+ }
+
+ public void setNumOutlinks(int numOutlinks) {
+ this.numOutlinks = numOutlinks;
+ }
+
+ public float getInlinkScore() {
+ return inlinkScore;
+ }
+
+ public void setInlinkScore(float inlinkScore) {
+ this.inlinkScore = inlinkScore;
+ }
+
+ public float getOutlinkScore() {
+ return (numOutlinks > 0) ? inlinkScore / numOutlinks : inlinkScore;
+ }
+
+ public Metadata getMetadata() {
+ return metadata;
+ }
+
+ public void setMetadata(Metadata metadata) {
+ this.metadata = metadata;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+
+ numInlinks = in.readInt();
+ numOutlinks = in.readInt();
+ inlinkScore = in.readFloat();
+ metadata.clear();
+ metadata.readFields(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+
+ out.writeInt(numInlinks);
+ out.writeInt(numOutlinks);
+ out.writeFloat(inlinkScore);
+ metadata.write(out);
+ }
+
+ public String toString() {
+ return "num inlinks: " + numInlinks + ", num outlinks: " + numOutlinks
+ + ", inlink score: " + inlinkScore + ", outlink score: "
+ + getOutlinkScore() + ", metadata: " + metadata.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
new file mode 100644
index 0000000..4a57c29
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeDumper.java
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * A tools that dumps out the top urls by number of inlinks, number of outlinks,
+ * or by score, to a text file. One of the major uses of this tool is to check
+ * the top scoring urls of a link analysis program such as LinkRank.
+ *
+ * For number of inlinks or number of outlinks the WebGraph program will need to
+ * have been run. For link analysis score a program such as LinkRank will need
+ * to have been run which updates the NodeDb of the WebGraph.
+ */
+public class NodeDumper extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(NodeDumper.class);
+
+ private static enum DumpType {
+ INLINKS, OUTLINKS, SCORES
+ }
+
+ private static enum AggrType {
+ SUM, MAX
+ }
+
+ private static enum NameType {
+ HOST, DOMAIN
+ }
+
+ /**
+ * Outputs the top urls sorted in descending order. Depending on the flag set
+ * on the command line, the top urls could be for number of inlinks, for
+ * number of outlinks, or for link analysis score.
+ */
+ public static class Sorter extends Configured implements
+ Mapper<Text, Node, FloatWritable, Text>,
+ Reducer<FloatWritable, Text, Text, FloatWritable> {
+
+ private JobConf conf;
+ private boolean inlinks = false;
+ private boolean outlinks = false;
+ private boolean scores = false;
+ private long topn = Long.MAX_VALUE;
+
+ /**
+ * Configures the job, sets the flag for type of content and the topN number
+ * if any.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ this.inlinks = conf.getBoolean("inlinks", false);
+ this.outlinks = conf.getBoolean("outlinks", false);
+ this.scores = conf.getBoolean("scores", true);
+ this.topn = conf.getLong("topn", Long.MAX_VALUE);
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Outputs the url with the appropriate number of inlinks, outlinks, or for
+ * score.
+ */
+ public void map(Text key, Node node,
+ OutputCollector<FloatWritable, Text> output, Reporter reporter)
+ throws IOException {
+
+ float number = 0;
+ if (inlinks) {
+ number = node.getNumInlinks();
+ } else if (outlinks) {
+ number = node.getNumOutlinks();
+ } else {
+ number = node.getInlinkScore();
+ }
+
+ // number collected with negative to be descending
+ output.collect(new FloatWritable(-number), key);
+ }
+
+ /**
+ * Flips and collects the url and numeric sort value.
+ */
+ public void reduce(FloatWritable key, Iterator<Text> values,
+ OutputCollector<Text, FloatWritable> output, Reporter reporter)
+ throws IOException {
+
+ // take the negative of the negative to get original value, sometimes 0
+ // value are a little weird
+ float val = key.get();
+ FloatWritable number = new FloatWritable(val == 0 ? 0 : -val);
+ long numCollected = 0;
+
+ // collect all values, this time with the url as key
+ while (values.hasNext() && (numCollected < topn)) {
+ Text url = WritableUtils.clone(values.next(), conf);
+ output.collect(url, number);
+ numCollected++;
+ }
+ }
+ }
+
+ /**
+ * Outputs the hosts or domains with an associated value. This value consists
+ * of either the number of inlinks, the number of outlinks or the score. The
+ * computed value is then either the sum of all parts or the top value.
+ */
+ public static class Dumper extends Configured implements
+ Mapper<Text, Node, Text, FloatWritable>,
+ Reducer<Text, FloatWritable, Text, FloatWritable> {
+
+ private JobConf conf;
+ private boolean inlinks = false;
+ private boolean outlinks = false;
+ private boolean scores = false;
+ private long topn = Long.MAX_VALUE;
+ private boolean host = false;
+ private boolean domain = false;
+ private boolean sum = false;
+ private boolean max = false;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ this.inlinks = conf.getBoolean("inlinks", false);
+ this.outlinks = conf.getBoolean("outlinks", false);
+ this.scores = conf.getBoolean("scores", true);
+ this.topn = conf.getLong("topn", Long.MAX_VALUE);
+ this.host = conf.getBoolean("host", false);
+ this.domain = conf.getBoolean("domain", false);
+ this.sum = conf.getBoolean("sum", false);
+ this.max = conf.getBoolean("max", false);
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Outputs the host or domain as key for this record and numInlinks,
+ * numOutlinks or score as the value.
+ */
+ public void map(Text key, Node node,
+ OutputCollector<Text, FloatWritable> output, Reporter reporter)
+ throws IOException {
+
+ float number = 0;
+ if (inlinks) {
+ number = node.getNumInlinks();
+ } else if (outlinks) {
+ number = node.getNumOutlinks();
+ } else {
+ number = node.getInlinkScore();
+ }
+
+ if (host) {
+ key.set(URLUtil.getHost(key.toString()));
+ } else {
+ key.set(URLUtil.getDomainName(key.toString()));
+ }
+
+ output.collect(key, new FloatWritable(number));
+ }
+
+ /**
+ * Outputs either the sum or the top value for this record.
+ */
+ public void reduce(Text key, Iterator<FloatWritable> values,
+ OutputCollector<Text, FloatWritable> output, Reporter reporter)
+ throws IOException {
+
+ long numCollected = 0;
+ float sumOrMax = 0;
+ float val = 0;
+
+ // collect all values, this time with the url as key
+ while (values.hasNext() && (numCollected < topn)) {
+ val = values.next().get();
+
+ if (sum) {
+ sumOrMax += val;
+ } else {
+ if (sumOrMax < val) {
+ sumOrMax = val;
+ }
+ }
+
+ numCollected++;
+ }
+
+ output.collect(key, new FloatWritable(sumOrMax));
+ }
+ }
+
+ /**
+ * Runs the process to dump the top urls out to a text file.
+ *
+ * @param webGraphDb
+ * The WebGraph from which to pull values.
+ *
+ * @param topN
+ * @param output
+ *
+ * @throws IOException
+ * If an error occurs while dumping the top values.
+ */
+ public void dumpNodes(Path webGraphDb, DumpType type, long topN, Path output,
+ boolean asEff, NameType nameType, AggrType aggrType,
+ boolean asSequenceFile) throws Exception {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("NodeDumper: starting at " + sdf.format(start));
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Configuration conf = getConf();
+
+ JobConf dumper = new NutchJob(conf);
+ dumper.setJobName("NodeDumper: " + webGraphDb);
+ FileInputFormat.addInputPath(dumper, nodeDb);
+ dumper.setInputFormat(SequenceFileInputFormat.class);
+
+ if (nameType == null) {
+ dumper.setMapperClass(Sorter.class);
+ dumper.setReducerClass(Sorter.class);
+ dumper.setMapOutputKeyClass(FloatWritable.class);
+ dumper.setMapOutputValueClass(Text.class);
+ } else {
+ dumper.setMapperClass(Dumper.class);
+ dumper.setReducerClass(Dumper.class);
+ dumper.setMapOutputKeyClass(Text.class);
+ dumper.setMapOutputValueClass(FloatWritable.class);
+ }
+
+ dumper.setOutputKeyClass(Text.class);
+ dumper.setOutputValueClass(FloatWritable.class);
+ FileOutputFormat.setOutputPath(dumper, output);
+
+ if (asSequenceFile) {
+ dumper.setOutputFormat(SequenceFileOutputFormat.class);
+ } else {
+ dumper.setOutputFormat(TextOutputFormat.class);
+ }
+
+ dumper.setNumReduceTasks(1);
+ dumper.setBoolean("inlinks", type == DumpType.INLINKS);
+ dumper.setBoolean("outlinks", type == DumpType.OUTLINKS);
+ dumper.setBoolean("scores", type == DumpType.SCORES);
+
+ dumper.setBoolean("host", nameType == NameType.HOST);
+ dumper.setBoolean("domain", nameType == NameType.DOMAIN);
+ dumper.setBoolean("sum", aggrType == AggrType.SUM);
+ dumper.setBoolean("max", aggrType == AggrType.MAX);
+
+ dumper.setLong("topn", topN);
+
+ // Set equals-sign as separator for Solr's ExternalFileField
+ if (asEff) {
+ dumper.set("mapred.textoutputformat.separator", "=");
+ }
+
+ try {
+ LOG.info("NodeDumper: running");
+ JobClient.runJob(dumper);
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("NodeDumper: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new NodeDumper(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the node dumper tool.
+ */
+ public int run(String[] args) throws Exception {
+
+ Options options = new Options();
+ OptionBuilder.withArgName("help");
+ OptionBuilder.withDescription("show this help message");
+ Option helpOpts = OptionBuilder.create("help");
+ options.addOption(helpOpts);
+
+ OptionBuilder.withArgName("webgraphdb");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the web graph database to use");
+ Option webGraphDbOpts = OptionBuilder.create("webgraphdb");
+ options.addOption(webGraphDbOpts);
+
+ OptionBuilder.withArgName("inlinks");
+ OptionBuilder.withDescription("show highest inlinks");
+ Option inlinkOpts = OptionBuilder.create("inlinks");
+ options.addOption(inlinkOpts);
+
+ OptionBuilder.withArgName("outlinks");
+ OptionBuilder.withDescription("show highest outlinks");
+ Option outlinkOpts = OptionBuilder.create("outlinks");
+ options.addOption(outlinkOpts);
+
+ OptionBuilder.withArgName("scores");
+ OptionBuilder.withDescription("show highest scores");
+ Option scoreOpts = OptionBuilder.create("scores");
+ options.addOption(scoreOpts);
+
+ OptionBuilder.withArgName("topn");
+ OptionBuilder.hasOptionalArg();
+ OptionBuilder.withDescription("show topN scores");
+ Option topNOpts = OptionBuilder.create("topn");
+ options.addOption(topNOpts);
+
+ OptionBuilder.withArgName("output");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the output directory to use");
+ Option outputOpts = OptionBuilder.create("output");
+ options.addOption(outputOpts);
+
+ OptionBuilder.withArgName("asEff");
+ OptionBuilder
+ .withDescription("Solr ExternalFileField compatible output format");
+ Option effOpts = OptionBuilder.create("asEff");
+ options.addOption(effOpts);
+
+ OptionBuilder.hasArgs(2);
+ OptionBuilder.withDescription("group <host|domain> <sum|max>");
+ Option groupOpts = OptionBuilder.create("group");
+ options.addOption(groupOpts);
+
+ OptionBuilder.withArgName("asSequenceFile");
+ OptionBuilder.withDescription("whether to output as a sequencefile");
+ Option sequenceFileOpts = OptionBuilder.create("asSequenceFile");
+ options.addOption(sequenceFileOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("NodeDumper", options);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ boolean inlinks = line.hasOption("inlinks");
+ boolean outlinks = line.hasOption("outlinks");
+
+ long topN = (line.hasOption("topn") ? Long.parseLong(line
+ .getOptionValue("topn")) : Long.MAX_VALUE);
+
+ // get the correct dump type
+ String output = line.getOptionValue("output");
+ DumpType type = (inlinks ? DumpType.INLINKS
+ : outlinks ? DumpType.OUTLINKS : DumpType.SCORES);
+
+ NameType nameType = null;
+ AggrType aggrType = null;
+ String[] group = line.getOptionValues("group");
+ if (group != null && group.length == 2) {
+ nameType = (group[0].equals("host") ? NameType.HOST : group[0]
+ .equals("domain") ? NameType.DOMAIN : null);
+ aggrType = (group[1].equals("sum") ? AggrType.SUM : group[1]
+ .equals("sum") ? AggrType.MAX : null);
+ }
+
+ // Use ExternalFileField?
+ boolean asEff = line.hasOption("asEff");
+ boolean asSequenceFile = line.hasOption("asSequenceFile");
+
+ dumpNodes(new Path(webGraphDb), type, topN, new Path(output), asEff,
+ nameType, aggrType, asSequenceFile);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("NodeDumper: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java
new file mode 100644
index 0000000..e6b6815
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/NodeReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * Reads and prints to system out information for a single node from the NodeDb
+ * in the WebGraph.
+ */
+public class NodeReader extends Configured {
+
+ private FileSystem fs;
+ private MapFile.Reader[] nodeReaders;
+
+ public NodeReader() {
+
+ }
+
+ public NodeReader(Configuration conf) {
+ super(conf);
+ }
+
+ /**
+ * Prints the content of the Node represented by the url to system out.
+ *
+ * @param webGraphDb
+ * The webgraph from which to get the node.
+ * @param url
+ * The url of the node.
+ *
+ * @throws IOException
+ * If an error occurs while getting the node.
+ */
+ public void dumpUrl(Path webGraphDb, String url) throws IOException {
+
+ fs = FileSystem.get(getConf());
+ nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb,
+ WebGraph.NODE_DIR), getConf());
+
+ // open the readers, get the node, print out the info, and close the readers
+ Text key = new Text(url);
+ Node node = new Node();
+ MapFileOutputFormat.getEntry(nodeReaders,
+ new HashPartitioner<Text, Node>(), key, node);
+ System.out.println(url + ":");
+ System.out.println(" inlink score: " + node.getInlinkScore());
+ System.out.println(" outlink score: " + node.getOutlinkScore());
+ System.out.println(" num inlinks: " + node.getNumInlinks());
+ System.out.println(" num outlinks: " + node.getNumOutlinks());
+ FSUtils.closeReaders(nodeReaders);
+ }
+
+ /**
+ * Runs the NodeReader tool. The command line arguments must contain a
+ * webgraphdb path and a url. The url must match the normalized url that is
+ * contained in the NodeDb of the WebGraph.
+ */
+ public static void main(String[] args) throws Exception {
+
+ Options options = new Options();
+ OptionBuilder.withArgName("help");
+ OptionBuilder.withDescription("show this help message");
+ Option helpOpts = OptionBuilder.create("help");
+ options.addOption(helpOpts);
+
+ OptionBuilder.withArgName("webgraphdb");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the webgraphdb to use");
+ Option webGraphOpts = OptionBuilder.create("webgraphdb");
+ options.addOption(webGraphOpts);
+
+ OptionBuilder.withArgName("url");
+ OptionBuilder.hasOptionalArg();
+ OptionBuilder.withDescription("the url to dump");
+ Option urlOpts = OptionBuilder.create("url");
+ options.addOption(urlOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ // command line must take a webgraphdb and a url
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || !line.hasOption("url")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("WebGraphReader", options);
+ return;
+ }
+
+ // dump the values to system out and return
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ String url = line.getOptionValue("url");
+ NodeReader reader = new NodeReader(NutchConfiguration.create());
+ reader.dumpUrl(new Path(webGraphDb), url);
+
+ return;
+ } catch (Exception e) {
+ e.printStackTrace();
+ return;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
new file mode 100644
index 0000000..19704eb
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/ScoreUpdater.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * Updates the score from the WebGraph node database into the crawl database.
+ * Any score that is not in the node database is set to the clear score in the
+ * crawl database.
+ */
+public class ScoreUpdater extends Configured implements Tool,
+ Mapper<Text, Writable, Text, ObjectWritable>,
+ Reducer<Text, ObjectWritable, Text, CrawlDatum> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ScoreUpdater.class);
+
+ private JobConf conf;
+ private float clearScore = 0.0f;
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ clearScore = conf.getFloat("link.score.updater.clear.score", 0.0f);
+ }
+
+ /**
+ * Changes input into ObjectWritables.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text, ObjectWritable> output, Reporter reporter)
+ throws IOException {
+
+ ObjectWritable objWrite = new ObjectWritable();
+ objWrite.set(value);
+ output.collect(key, objWrite);
+ }
+
+ /**
+ * Creates new CrawlDatum objects with the updated score from the NodeDb or
+ * with a cleared score.
+ */
+ public void reduce(Text key, Iterator<ObjectWritable> values,
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
+
+ String url = key.toString();
+ Node node = null;
+ CrawlDatum datum = null;
+
+ // set the node and the crawl datum, should be one of each unless no node
+ // for url in the crawldb
+ while (values.hasNext()) {
+ ObjectWritable next = values.next();
+ Object value = next.get();
+ if (value instanceof Node) {
+ node = (Node) value;
+ } else if (value instanceof CrawlDatum) {
+ datum = (CrawlDatum) value;
+ }
+ }
+
+ // datum should never be null, could happen if somehow the url was
+ // normalized or changed after being pulled from the crawldb
+ if (datum != null) {
+
+ if (node != null) {
+
+ // set the inlink score in the nodedb
+ float inlinkScore = node.getInlinkScore();
+ datum.setScore(inlinkScore);
+ LOG.debug(url + ": setting to score " + inlinkScore);
+ } else {
+
+ // clear out the score in the crawldb
+ datum.setScore(clearScore);
+ LOG.debug(url + ": setting to clear score of " + clearScore);
+ }
+
+ output.collect(key, datum);
+ } else {
+ LOG.debug(url + ": no datum");
+ }
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Updates the inlink score in the web graph node databsae into the crawl
+ * database.
+ *
+ * @param crawlDb
+ * The crawl database to update
+ * @param webGraphDb
+ * The webgraph database to use.
+ *
+ * @throws IOException
+ * If an error occurs while updating the scores.
+ */
+ public void update(Path crawlDb, Path webGraphDb) throws IOException {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("ScoreUpdater: starting at " + sdf.format(start));
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // create a temporary crawldb with the new scores
+ LOG.info("Running crawldb update " + crawlDb);
+ Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR);
+ Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+ Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random()
+ .nextInt(Integer.MAX_VALUE)));
+
+ // run the updater job outputting to the temp crawl database
+ JobConf updater = new NutchJob(conf);
+ updater.setJobName("Update CrawlDb from WebGraph");
+ FileInputFormat.addInputPath(updater, crawlDbCurrent);
+ FileInputFormat.addInputPath(updater, nodeDb);
+ FileOutputFormat.setOutputPath(updater, newCrawlDb);
+ updater.setInputFormat(SequenceFileInputFormat.class);
+ updater.setMapperClass(ScoreUpdater.class);
+ updater.setReducerClass(ScoreUpdater.class);
+ updater.setMapOutputKeyClass(Text.class);
+ updater.setMapOutputValueClass(ObjectWritable.class);
+ updater.setOutputKeyClass(Text.class);
+ updater.setOutputValueClass(CrawlDatum.class);
+ updater.setOutputFormat(MapFileOutputFormat.class);
+
+ try {
+ JobClient.runJob(updater);
+ } catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
+
+ // remove the temp crawldb on error
+ if (fs.exists(newCrawlDb)) {
+ fs.delete(newCrawlDb, true);
+ }
+ throw e;
+ }
+
+ // install the temp crawl database
+ LOG.info("ScoreUpdater: installing new crawldb " + crawlDb);
+ CrawlDb.install(updater, crawlDb);
+
+ long end = System.currentTimeMillis();
+ LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new ScoreUpdater(),
+ args);
+ System.exit(res);
+ }
+
+ /**
+ * Runs the ScoreUpdater tool.
+ */
+ public int run(String[] args) throws Exception {
+
+ Options options = new Options();
+ OptionBuilder.withArgName("help");
+ OptionBuilder.withDescription("show this help message");
+ Option helpOpts = OptionBuilder.create("help");
+ options.addOption(helpOpts);
+
+ OptionBuilder.withArgName("crawldb");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the crawldb to use");
+ Option crawlDbOpts = OptionBuilder.create("crawldb");
+ options.addOption(crawlDbOpts);
+
+ OptionBuilder.withArgName("webgraphdb");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the webgraphdb to use");
+ Option webGraphOpts = OptionBuilder.create("webgraphdb");
+ options.addOption(webGraphOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || !line.hasOption("crawldb")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("ScoreUpdater", options);
+ return -1;
+ }
+
+ String crawlDb = line.getOptionValue("crawldb");
+ String webGraphDb = line.getOptionValue("webgraphdb");
+ update(new Path(crawlDb), new Path(webGraphDb));
+ return 0;
+ } catch (Exception e) {
+ LOG.error("ScoreUpdater: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java
new file mode 100644
index 0000000..e2c3d8b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/WebGraph.java
@@ -0,0 +1,783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.scoring.webgraph;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Creates three databases, one for inlinks, one for outlinks, and a node
+ * database that holds the number of in and outlinks to a url and the current
+ * score for the url.
+ *
+ * The score is set by an analysis program such as LinkRank. The WebGraph is an
+ * update-able database. Outlinks are stored by their fetch time or by the
+ * current system time if no fetch time is available. Only the most recent
+ * version of outlinks for a given url is stored. As more crawls are executed
+ * and the WebGraph updated, newer Outlinks will replace older Outlinks. This
+ * allows the WebGraph to adapt to changes in the link structure of the web.
+ *
+ * The Inlink database is created from the Outlink database and is regenerated
+ * when the WebGraph is updated. The Node database is created from both the
+ * Inlink and Outlink databases. Because the Node database is overwritten when
+ * the WebGraph is updated and because the Node database holds current scores
+ * for urls it is recommended that a crawl-cycle (one or more full crawls) fully
+ * complete before the WebGraph is updated and some type of analysis, such as
+ * LinkRank, is run to update scores in the Node database in a stable fashion.
+ */
+public class WebGraph extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(WebGraph.class);
+ public static final String LOCK_NAME = ".locked";
+ public static final String INLINK_DIR = "inlinks";
+ public static final String OUTLINK_DIR = "outlinks/current";
+ public static final String OLD_OUTLINK_DIR = "outlinks/old";
+ public static final String NODE_DIR = "nodes";
+
+ /**
+ * The OutlinkDb creates a database of all outlinks. Outlinks to internal urls
+ * by domain and host can be ignored. The number of Outlinks out to a given
+ * page or domain can also be limited.
+ */
+ public static class OutlinkDb extends Configured implements
+ Mapper<Text, Writable, Text, NutchWritable>,
+ Reducer<Text, NutchWritable, Text, LinkDatum> {
+
+ public static final String URL_NORMALIZING = "webgraph.url.normalizers";
+ public static final String URL_FILTERING = "webgraph.url.filters";
+
+ // ignoring internal domains, internal hosts
+ private boolean ignoreDomain = true;
+ private boolean ignoreHost = true;
+
+ // limiting urls out to a page or to a domain
+ private boolean limitPages = true;
+ private boolean limitDomains = true;
+
+ // using normalizers and/or filters
+ private boolean normalize = false;
+ private boolean filter = false;
+
+ // url normalizers, filters and job configuration
+ private URLNormalizers urlNormalizers;
+ private URLFilters filters;
+ private JobConf conf;
+
+ /**
+ * Normalizes and trims extra whitespace from the given url.
+ *
+ * @param url
+ * The url to normalize.
+ *
+ * @return The normalized url.
+ */
+ private String normalizeUrl(String url) {
+
+ if (!normalize) {
+ return url;
+ }
+
+ String normalized = null;
+ if (urlNormalizers != null) {
+ try {
+
+ // normalize and trim the url
+ normalized = urlNormalizers.normalize(url,
+ URLNormalizers.SCOPE_DEFAULT);
+ normalized = normalized.trim();
+ } catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e);
+ normalized = null;
+ }
+ }
+ return normalized;
+ }
+
+ /**
+ * Filters the given url.
+ *
+ * @param url
+ * The url to filter.
+ *
+ * @return The filtered url or null.
+ */
+ private String filterUrl(String url) {
+
+ if (!filter) {
+ return url;
+ }
+
+ try {
+ url = filters.filter(url);
+ } catch (Exception e) {
+ url = null;
+ }
+
+ return url;
+ }
+
+ /**
+ * Returns the fetch time from the parse data or the current system time if
+ * the fetch time doesn't exist.
+ *
+ * @param data
+ * The parse data.
+ *
+ * @return The fetch time as a long.
+ */
+ private long getFetchTime(ParseData data) {
+
+ // default to current system time
+ long fetchTime = System.currentTimeMillis();
+ String fetchTimeStr = data.getContentMeta().get(Nutch.FETCH_TIME_KEY);
+ try {
+ // get the fetch time from the parse data
+ fetchTime = Long.parseLong(fetchTimeStr);
+ } catch (Exception e) {
+ fetchTime = System.currentTimeMillis();
+ }
+ return fetchTime;
+ }
+
+ /**
+ * Default constructor.
+ */
+ public OutlinkDb() {
+ }
+
+ /**
+ * Configurable constructor.
+ */
+ public OutlinkDb(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Configures the OutlinkDb job. Sets up internal links and link limiting.
+ */
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ ignoreHost = conf.getBoolean("link.ignore.internal.host", true);
+ ignoreDomain = conf.getBoolean("link.ignore.internal.domain", true);
+ limitPages = conf.getBoolean("link.ignore.limit.page", true);
+ limitDomains = conf.getBoolean("link.ignore.limit.domain", true);
+
+ normalize = conf.getBoolean(URL_NORMALIZING, false);
+ filter = conf.getBoolean(URL_FILTERING, false);
+
+ if (normalize) {
+ urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+ }
+
+ if (filter) {
+ filters = new URLFilters(conf);
+ }
+ }
+
+ /**
+ * Passes through existing LinkDatum objects from an existing OutlinkDb and
+ * maps out new LinkDatum objects from new crawls ParseData.
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
+
+ // normalize url, stop processing if null
+ String url = normalizeUrl(key.toString());
+ if (url == null) {
+ return;
+ }
+
+ // filter url
+ if (filterUrl(url) == null) {
+ return;
+ }
+
+ // Overwrite the key with the normalized URL
+ key.set(url);
+
+ if (value instanceof CrawlDatum) {
+ CrawlDatum datum = (CrawlDatum) value;
+
+ if (datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+ || datum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+ || datum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
+
+ // Tell the reducer to get rid of all instances of this key
+ output.collect(key, new NutchWritable(new BooleanWritable(true)));
+ }
+ } else if (value instanceof ParseData) {
+ // get the parse data and the outlinks from the parse data, along with
+ // the fetch time for those links
+ ParseData data = (ParseData) value;
+ long fetchTime = getFetchTime(data);
+ Outlink[] outlinkAr = data.getOutlinks();
+ Map<String, String> outlinkMap = new LinkedHashMap<String, String>();
+
+ // normalize urls and put into map
+ if (outlinkAr != null && outlinkAr.length > 0) {
+ for (int i = 0; i < outlinkAr.length; i++) {
+ Outlink outlink = outlinkAr[i];
+ String toUrl = normalizeUrl(outlink.getToUrl());
+
+ if (filterUrl(toUrl) == null) {
+ continue;
+ }
+
+ // only put into map if the url doesn't already exist in the map or
+ // if it does and the anchor for that link is null, will replace if
+ // url is existing
+ boolean existingUrl = outlinkMap.containsKey(toUrl);
+ if (toUrl != null
+ && (!existingUrl || (existingUrl && outlinkMap.get(toUrl) == null))) {
+ outlinkMap.put(toUrl, outlink.getAnchor());
+ }
+ }
+ }
+
+ // collect the outlinks under the fetch time
+ for (String outlinkUrl : outlinkMap.keySet()) {
+ String anchor = outlinkMap.get(outlinkUrl);
+ LinkDatum datum = new LinkDatum(outlinkUrl, anchor, fetchTime);
+ output.collect(key, new NutchWritable(datum));
+ }
+ } else if (value instanceof LinkDatum) {
+ LinkDatum datum = (LinkDatum) value;
+ String linkDatumUrl = normalizeUrl(datum.getUrl());
+
+ if (filterUrl(linkDatumUrl) != null) {
+ datum.setUrl(linkDatumUrl);
+
+ // collect existing outlinks from existing OutlinkDb
+ output.collect(key, new NutchWritable(datum));
+ }
+ }
+ }
+
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<Text, LinkDatum> output, Reporter reporter)
+ throws IOException {
+
+ // aggregate all outlinks, get the most recent timestamp for a fetch
+ // which should be the timestamp for all of the most recent outlinks
+ long mostRecent = 0L;
+ List<LinkDatum> outlinkList = new ArrayList<LinkDatum>();
+ while (values.hasNext()) {
+ Writable value = values.next().get();
+
+ if (value instanceof LinkDatum) {
+ // loop through, change out most recent timestamp if needed
+ LinkDatum next = (LinkDatum) value;
+ long timestamp = next.getTimestamp();
+ if (mostRecent == 0L || mostRecent < timestamp) {
+ mostRecent = timestamp;
+ }
+ outlinkList.add(WritableUtils.clone(next, conf));
+ reporter.incrCounter("WebGraph.outlinks", "added links", 1);
+ } else if (value instanceof BooleanWritable) {
+ BooleanWritable delete = (BooleanWritable) value;
+ // Actually, delete is always true, otherwise we don't emit it in the
+ // mapper in the first place
+ if (delete.get() == true) {
+ // This page is gone, do not emit it's outlinks
+ reporter.incrCounter("WebGraph.outlinks", "removed links", 1);
+ return;
+ }
+ }
+ }
+
+ // get the url, domain, and host for the url
+ String url = key.toString();
+ String domain = URLUtil.getDomainName(url);
+ String host = URLUtil.getHost(url);
+
+ // setup checking sets for domains and pages
+ Set<String> domains = new HashSet<String>();
+ Set<String> pages = new HashSet<String>();
+
+ // loop through the link datums
+ for (LinkDatum datum : outlinkList) {
+
+ // get the url, host, domain, and page for each outlink
+ String toUrl = datum.getUrl();
+ String toDomain = URLUtil.getDomainName(toUrl);
+ String toHost = URLUtil.getHost(toUrl);
+ String toPage = URLUtil.getPage(toUrl);
+ datum.setLinkType(LinkDatum.OUTLINK);
+
+ // outlinks must be the most recent and conform to internal url and
+ // limiting rules, if it does collect it
+ if (datum.getTimestamp() == mostRecent
+ && (!limitPages || (limitPages && !pages.contains(toPage)))
+ && (!limitDomains || (limitDomains && !domains.contains(toDomain)))
+ && (!ignoreHost || (ignoreHost && !toHost.equalsIgnoreCase(host)))
+ && (!ignoreDomain || (ignoreDomain && !toDomain
+ .equalsIgnoreCase(domain)))) {
+ output.collect(key, datum);
+ pages.add(toPage);
+ domains.add(toDomain);
+ }
+ }
+ }
+
+ public void close() {
+ }
+ }
+
+ /**
+ * The InlinkDb creates a database of Inlinks. Inlinks are inverted from the
+ * OutlinkDb LinkDatum objects and are regenerated each time the WebGraph is
+ * updated.
+ */
+ private static class InlinkDb extends Configured implements
+ Mapper<Text, LinkDatum, Text, LinkDatum> {
+
+ private long timestamp;
+
+ /**
+ * Configures job. Sets timestamp for all Inlink LinkDatum objects to the
+ * current system time.
+ */
+ public void configure(JobConf conf) {
+ timestamp = System.currentTimeMillis();
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Inverts the Outlink LinkDatum objects into new LinkDatum objects with a
+ * new system timestamp, type and to and from url switched.
+ */
+ public void map(Text key, LinkDatum datum,
+ OutputCollector<Text, LinkDatum> output, Reporter reporter)
+ throws IOException {
+
+ // get the to and from url and the anchor
+ String fromUrl = key.toString();
+ String toUrl = datum.getUrl();
+ String anchor = datum.getAnchor();
+
+ // flip the from and to url and set the new link type
+ LinkDatum inlink = new LinkDatum(fromUrl, anchor, timestamp);
+ inlink.setLinkType(LinkDatum.INLINK);
+ output.collect(new Text(toUrl), inlink);
+ }
+ }
+
+ /**
+ * Creates the Node database which consists of the number of in and outlinks
+ * for each url and a score slot for analysis programs such as LinkRank.
+ */
+ private static class NodeDb extends Configured implements
+ Reducer<Text, LinkDatum, Text, Node> {
+
+ /**
+ * Configures job.
+ */
+ public void configure(JobConf conf) {
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Counts the number of inlinks and outlinks for each url and sets a default
+ * score of 0.0 for each url (node) in the webgraph.
+ */
+ public void reduce(Text key, Iterator<LinkDatum> values,
+ OutputCollector<Text, Node> output, Reporter reporter)
+ throws IOException {
+
+ Node node = new Node();
+ int numInlinks = 0;
+ int numOutlinks = 0;
+
+ // loop through counting number of in and out links
+ while (values.hasNext()) {
+ LinkDatum next = values.next();
+ if (next.getLinkType() == LinkDatum.INLINK) {
+ numInlinks++;
+ } else if (next.getLinkType() == LinkDatum.OUTLINK) {
+ numOutlinks++;
+ }
+ }
+
+ // set the in and outlinks and a default score of 0
+ node.setNumInlinks(numInlinks);
+ node.setNumOutlinks(numOutlinks);
+ node.setInlinkScore(0.0f);
+ output.collect(key, node);
+ }
+ }
+
+ /**
+ * Creates the three different WebGraph databases, Outlinks, Inlinks, and
+ * Node. If a current WebGraph exists then it is updated, if it doesn't exist
+ * then a new WebGraph database is created.
+ *
+ * @param webGraphDb
+ * The WebGraph to create or update.
+ * @param segments
+ * The array of segments used to update the WebGraph. Newer segments
+ * and fetch times will overwrite older segments.
+ * @param normalize
+ * whether to use URLNormalizers on URL's in the segment
+ * @param filter
+ * whether to use URLFilters on URL's in the segment
+ *
+ * @throws IOException
+ * If an error occurs while processing the WebGraph.
+ */
+ public void createWebGraph(Path webGraphDb, Path[] segments,
+ boolean normalize, boolean filter) throws IOException {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("WebGraphDb: starting at " + sdf.format(start));
+ LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
+ LOG.info("WebGraphDb: URL normalize: " + normalize);
+ LOG.info("WebGraphDb: URL filter: " + filter);
+ }
+
+ Configuration conf = getConf();
+ FileSystem fs = FileSystem.get(conf);
+
+ // lock an existing webgraphdb to prevent multiple simultaneous updates
+ Path lock = new Path(webGraphDb, LOCK_NAME);
+ if (!fs.exists(webGraphDb)) {
+ fs.mkdirs(webGraphDb);
+ }
+
+ LockUtil.createLockFile(fs, lock, false);
+
+ // outlink and temp outlink database paths
+ Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
+ Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR);
+
+ if (!fs.exists(outlinkDb)) {
+ fs.mkdirs(outlinkDb);
+ }
+
+ Path tempOutlinkDb = new Path(outlinkDb + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ JobConf outlinkJob = new NutchJob(conf);
+ outlinkJob.setJobName("Outlinkdb: " + outlinkDb);
+
+ boolean deleteGone = conf.getBoolean("link.delete.gone", false);
+ boolean preserveBackup = conf.getBoolean("db.preserve.backup", true);
+
+ if (deleteGone) {
+ LOG.info("OutlinkDb: deleting gone links");
+ }
+
+ // get the parse data and crawl fetch data for all segments
+ if (segments != null) {
+ for (int i = 0; i < segments.length; i++) {
+ Path parseData = new Path(segments[i], ParseData.DIR_NAME);
+ if (fs.exists(parseData)) {
+ LOG.info("OutlinkDb: adding input: " + parseData);
+ FileInputFormat.addInputPath(outlinkJob, parseData);
+ }
+
+ if (deleteGone) {
+ Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
+ if (fs.exists(crawlFetch)) {
+ LOG.info("OutlinkDb: adding input: " + crawlFetch);
+ FileInputFormat.addInputPath(outlinkJob, crawlFetch);
+ }
+ }
+ }
+ }
+
+ // add the existing webgraph
+ LOG.info("OutlinkDb: adding input: " + outlinkDb);
+ FileInputFormat.addInputPath(outlinkJob, outlinkDb);
+
+ outlinkJob.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
+ outlinkJob.setBoolean(OutlinkDb.URL_FILTERING, filter);
+
+ outlinkJob.setInputFormat(SequenceFileInputFormat.class);
+ outlinkJob.setMapperClass(OutlinkDb.class);
+ outlinkJob.setReducerClass(OutlinkDb.class);
+ outlinkJob.setMapOutputKeyClass(Text.class);
+ outlinkJob.setMapOutputValueClass(NutchWritable.class);
+ outlinkJob.setOutputKeyClass(Text.class);
+ outlinkJob.setOutputValueClass(LinkDatum.class);
+ FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
+ outlinkJob.setOutputFormat(MapFileOutputFormat.class);
+ outlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ // run the outlinkdb job and replace any old outlinkdb with the new one
+ try {
+ LOG.info("OutlinkDb: running");
+ JobClient.runJob(outlinkJob);
+ LOG.info("OutlinkDb: installing " + outlinkDb);
+ FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
+ FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
+ if (!preserveBackup && fs.exists(oldOutlinkDb))
+ fs.delete(oldOutlinkDb, true);
+ LOG.info("OutlinkDb: finished");
+ } catch (IOException e) {
+
+ // remove lock file and and temporary directory if an error occurs
+ LockUtil.removeLockFile(fs, lock);
+ if (fs.exists(tempOutlinkDb)) {
+ fs.delete(tempOutlinkDb, true);
+ }
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // inlink and temp link database paths
+ Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
+ Path tempInlinkDb = new Path(inlinkDb + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf inlinkJob = new NutchJob(conf);
+ inlinkJob.setJobName("Inlinkdb " + inlinkDb);
+ LOG.info("InlinkDb: adding input: " + outlinkDb);
+ FileInputFormat.addInputPath(inlinkJob, outlinkDb);
+ inlinkJob.setInputFormat(SequenceFileInputFormat.class);
+ inlinkJob.setMapperClass(InlinkDb.class);
+ inlinkJob.setMapOutputKeyClass(Text.class);
+ inlinkJob.setMapOutputValueClass(LinkDatum.class);
+ inlinkJob.setOutputKeyClass(Text.class);
+ inlinkJob.setOutputValueClass(LinkDatum.class);
+ FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
+ inlinkJob.setOutputFormat(MapFileOutputFormat.class);
+ inlinkJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ try {
+
+ // run the inlink and replace any old with new
+ LOG.info("InlinkDb: running");
+ JobClient.runJob(inlinkJob);
+ LOG.info("InlinkDb: installing " + inlinkDb);
+ FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
+ LOG.info("InlinkDb: finished");
+ } catch (IOException e) {
+
+ // remove lock file and and temporary directory if an error occurs
+ LockUtil.removeLockFile(fs, lock);
+ if (fs.exists(tempInlinkDb)) {
+ fs.delete(tempInlinkDb, true);
+ }
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // node and temp node database paths
+ Path nodeDb = new Path(webGraphDb, NODE_DIR);
+ Path tempNodeDb = new Path(nodeDb + "-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf nodeJob = new NutchJob(conf);
+ nodeJob.setJobName("NodeDb " + nodeDb);
+ LOG.info("NodeDb: adding input: " + outlinkDb);
+ LOG.info("NodeDb: adding input: " + inlinkDb);
+ FileInputFormat.addInputPath(nodeJob, outlinkDb);
+ FileInputFormat.addInputPath(nodeJob, inlinkDb);
+ nodeJob.setInputFormat(SequenceFileInputFormat.class);
+ nodeJob.setReducerClass(NodeDb.class);
+ nodeJob.setMapOutputKeyClass(Text.class);
+ nodeJob.setMapOutputValueClass(LinkDatum.class);
+ nodeJob.setOutputKeyClass(Text.class);
+ nodeJob.setOutputValueClass(Node.class);
+ FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
+ nodeJob.setOutputFormat(MapFileOutputFormat.class);
+ nodeJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+
+ try {
+
+ // run the node job and replace old nodedb with new
+ LOG.info("NodeDb: running");
+ JobClient.runJob(nodeJob);
+ LOG.info("NodeDb: installing " + nodeDb);
+ FSUtils.replace(fs, nodeDb, tempNodeDb, true);
+ LOG.info("NodeDb: finished");
+ } catch (IOException e) {
+
+ // remove lock file and and temporary directory if an error occurs
+ LockUtil.removeLockFile(fs, lock);
+ if (fs.exists(tempNodeDb)) {
+ fs.delete(tempNodeDb, true);
+ }
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ }
+
+ // remove the lock file for the webgraph
+ LockUtil.removeLockFile(fs, lock);
+
+ long end = System.currentTimeMillis();
+ LOG.info("WebGraphDb: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new WebGraph(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Parses command link arguments and runs the WebGraph jobs.
+ */
+ public int run(String[] args) throws Exception {
+
+ // boolean options
+ Option helpOpt = new Option("h", "help", false, "show this help message");
+ Option normOpt = new Option("n", "normalize", false,
+ "whether to use URLNormalizers on the URL's in the segment");
+ Option filtOpt = new Option("f", "filter", false,
+ "whether to use URLFilters on the URL's in the segment");
+
+ // argument options
+ @SuppressWarnings("static-access")
+ Option graphOpt = OptionBuilder
+ .withArgName("webgraphdb")
+ .hasArg()
+ .withDescription(
+ "the web graph database to create (if none exists) or use if one does")
+ .create("webgraphdb");
+ @SuppressWarnings("static-access")
+ Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+ .withDescription("the segment(s) to use").create("segment");
+ @SuppressWarnings("static-access")
+ Option segDirOpt = OptionBuilder.withArgName("segmentDir").hasArgs()
+ .withDescription("the segment directory to use").create("segmentDir");
+
+ // create the options
+ Options options = new Options();
+ options.addOption(helpOpt);
+ options.addOption(normOpt);
+ options.addOption(filtOpt);
+ options.addOption(graphOpt);
+ options.addOption(segOpt);
+ options.addOption(segDirOpt);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("webgraphdb")
+ || (!line.hasOption("segment") && !line.hasOption("segmentDir"))) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("WebGraph", options, true);
+ return -1;
+ }
+
+ String webGraphDb = line.getOptionValue("webgraphdb");
+
+ Path[] segPaths = null;
+
+ // Handle segment option
+ if (line.hasOption("segment")) {
+ String[] segments = line.getOptionValues("segment");
+ segPaths = new Path[segments.length];
+ for (int i = 0; i < segments.length; i++) {
+ segPaths[i] = new Path(segments[i]);
+ }
+ }
+
+ // Handle segmentDir option
+ if (line.hasOption("segmentDir")) {
+ Path dir = new Path(line.getOptionValue("segmentDir"));
+ FileSystem fs = dir.getFileSystem(getConf());
+ FileStatus[] fstats = fs.listStatus(dir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ segPaths = HadoopFSUtil.getPaths(fstats);
+ }
+
+ boolean normalize = false;
+
+ if (line.hasOption("normalize")) {
+ normalize = true;
+ }
+
+ boolean filter = false;
+
+ if (line.hasOption("filter")) {
+ filter = true;
+ }
+
+ createWebGraph(new Path(webGraphDb), segPaths, normalize, filter);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("WebGraph: " + StringUtils.stringifyException(e));
+ return -2;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java
new file mode 100644
index 0000000..a568b46
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/scoring/webgraph/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Scoring implementation based on link analysis
+ * ({@link org.apache.nutch.scoring.webgraph.LinkRank}),
+ * see {@link org.apache.nutch.scoring.webgraph.WebGraph}.
+ */
+package org.apache.nutch.scoring.webgraph;
+
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java b/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
new file mode 100644
index 0000000..d67b590
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/ContentAsTextInputFormat.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * An input format that takes Nutch Content objects and converts them to text
+ * while converting newline endings to spaces. This format is useful for working
+ * with Nutch content objects in Hadoop Streaming with other languages.
+ */
+public class ContentAsTextInputFormat extends
+ SequenceFileInputFormat<Text, Text> {
+
+ private static class ContentAsTextRecordReader implements
+ RecordReader<Text, Text> {
+
+ private final SequenceFileRecordReader<Text, Content> sequenceFileRecordReader;
+
+ private Text innerKey;
+ private Content innerValue;
+
+ public ContentAsTextRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+ sequenceFileRecordReader = new SequenceFileRecordReader<Text, Content>(
+ conf, split);
+ innerKey = sequenceFileRecordReader.createKey();
+ innerValue = sequenceFileRecordReader.createValue();
+ }
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public Text createValue() {
+ return new Text();
+ }
+
+ public synchronized boolean next(Text key, Text value) throws IOException {
+
+ // convert the content object to text
+ Text tKey = key;
+ if (!sequenceFileRecordReader.next(innerKey, innerValue)) {
+ return false;
+ }
+ tKey.set(innerKey.toString());
+ String contentAsStr = new String(innerValue.getContent());
+
+ // replace new line endings with spaces
+ contentAsStr = contentAsStr.replaceAll("\n", " ");
+ value.set(contentAsStr);
+
+ return true;
+ }
+
+ public float getProgress() throws IOException {
+ return sequenceFileRecordReader.getProgress();
+ }
+
+ public synchronized long getPos() throws IOException {
+ return sequenceFileRecordReader.getPos();
+ }
+
+ public synchronized void close() throws IOException {
+ sequenceFileRecordReader.close();
+ }
+ }
+
+ public ContentAsTextInputFormat() {
+ super();
+ }
+
+ public RecordReader<Text, Text> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+
+ reporter.setStatus(split.toString());
+ return new ContentAsTextRecordReader(job, (FileSplit) split);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java
new file mode 100644
index 0000000..ec601f4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentChecker.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Checks whether a segment is valid, or has a certain status (generated,
+ * fetched, parsed), or can be used safely for a certain processing step
+ * (e.g., indexing).
+ */
+public class SegmentChecker {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(SegmentChecker.class);
+
+ /**
+ * Check if the segment is indexable. May add new check methods here.
+ */
+ public static boolean isIndexable(Path segmentPath, FileSystem fs)
+ throws IOException {
+ if (segmentPath == null || fs == null) {
+ LOG.info("No segment path or filesystem set.");
+ return false;
+ }
+
+ boolean checkResult = true;
+ checkResult &= checkSegmentDir(segmentPath, fs);
+ // Add new check methods here
+
+ if (checkResult) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Check the segment to see if it is valid based on the sub directories.
+ */
+ public static boolean checkSegmentDir(Path segmentPath, FileSystem fs)
+ throws IOException {
+
+ if (segmentPath.getName().length() != 14) {
+ LOG.warn("The input path at {} is not a segment... skipping", segmentPath.getName());
+ return false;
+ }
+
+ FileStatus[] fstats_segment = fs.listStatus(segmentPath,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] segment_files = HadoopFSUtil.getPaths(fstats_segment);
+
+ boolean crawlFetchExists = false;
+ boolean crawlParseExists = false;
+ boolean parseDataExists = false;
+ boolean parseTextExists = false;
+
+ for (Path path : segment_files) {
+ String pathName = path.getName();
+ crawlFetchExists |= pathName.equals(CrawlDatum.FETCH_DIR_NAME);
+ crawlParseExists |= pathName.equals(CrawlDatum.PARSE_DIR_NAME);
+ parseDataExists |= pathName.equals(ParseData.DIR_NAME);
+ parseTextExists |= pathName.equals(ParseText.DIR_NAME);
+ }
+
+ if (parseTextExists && crawlParseExists && crawlFetchExists
+ && parseDataExists) {
+
+ // No segment dir missing
+ LOG.info("Segment dir is complete: " + segmentPath.toString() + ".");
+
+ return true;
+ } else {
+
+ // log the missing dir
+ StringBuilder missingDir = new StringBuilder("");
+ if (parseDataExists == false) {
+ missingDir.append(ParseData.DIR_NAME + ", ");
+ }
+ if (parseTextExists == false) {
+ missingDir.append(ParseText.DIR_NAME + ", ");
+ }
+ if (crawlParseExists == false) {
+ missingDir.append(CrawlDatum.PARSE_DIR_NAME + ", ");
+ }
+ if (crawlFetchExists == false) {
+ missingDir.append(CrawlDatum.FETCH_DIR_NAME + ", ");
+ }
+
+ String missingDirString = missingDir.toString();
+ LOG.warn("Skipping segment: " + segmentPath.toString()
+ + ". Missing sub directories: "
+ + missingDirString.substring(0, missingDirString.length() - 2));
+
+ return false;
+ }
+
+ }
+
+ /**
+ * Check the segment to see if it is has been parsed before.
+ */
+ public static boolean isParsed(Path segment, FileSystem fs)
+ throws IOException {
+
+ if (fs.exists(new Path(segment, CrawlDatum.PARSE_DIR_NAME))){
+ return true;
+ }
+ return false;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java
new file mode 100644
index 0000000..6d53809
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.util.Collection;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Interface used to filter segments during segment merge. It allows filtering
+ * on more sophisticated criteria than just URLs. In particular it allows
+ * filtering based on metadata collected while parsing page.
+ *
+ */
+public interface SegmentMergeFilter {
+ /** The name of the extension point. */
+ public final static String X_POINT_ID = SegmentMergeFilter.class.getName();
+
+ /**
+ * The filtering method which gets all information being merged for a given
+ * key (URL).
+ *
+ * @return <tt>true</tt> values for this <tt>key</tt> (URL) should be merged
+ * into the new segment.
+ */
+ public boolean filter(Text key, CrawlDatum generateData,
+ CrawlDatum fetchData, CrawlDatum sigData, Content content,
+ ParseData parseData, ParseText parseText, Collection<CrawlDatum> linked);
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java
new file mode 100644
index 0000000..7aa2de3
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMergeFilters.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.util.Collection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.net.URLFilter;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * This class wraps all {@link SegmentMergeFilter} extensions in a single object
+ * so it is easier to operate on them. If any of extensions returns
+ * <tt>false</tt> this one will return <tt>false</tt> as well.
+ *
+ */
+public class SegmentMergeFilters {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SegmentMergeFilters.class);
+ private SegmentMergeFilter[] filters;
+
+ public SegmentMergeFilters(Configuration conf) {
+ try {
+ ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+ SegmentMergeFilter.X_POINT_ID);
+ if (point == null)
+ throw new RuntimeException(URLFilter.X_POINT_ID + " not found.");
+ Extension[] extensions = point.getExtensions();
+ filters = new SegmentMergeFilter[extensions.length];
+ for (int i = 0; i < extensions.length; i++) {
+ filters[i] = (SegmentMergeFilter) extensions[i].getExtensionInstance();
+ }
+ } catch (PluginRuntimeException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Iterates over all {@link SegmentMergeFilter} extensions and if any of them
+ * returns false, it will return false as well.
+ *
+ * @return <tt>true</tt> values for this <tt>key</tt> (URL) should be merged
+ * into the new segment.
+ */
+ public boolean filter(Text key, CrawlDatum generateData,
+ CrawlDatum fetchData, CrawlDatum sigData, Content content,
+ ParseData parseData, ParseText parseText, Collection<CrawlDatum> linked) {
+ for (SegmentMergeFilter filter : filters) {
+ if (!filter.filter(key, generateData, fetchData, sigData, content,
+ parseData, parseText, linked)) {
+ if (LOG.isTraceEnabled())
+ LOG.trace("Key " + key + " dropped by " + filter.getClass().getName());
+ return false;
+ }
+ }
+ if (LOG.isTraceEnabled())
+ LOG.trace("Key " + key + " accepted for merge.");
+ return true;
+ }
+}