You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/01/21 14:58:56 UTC
svn commit: r1725952 - in /nutch/trunk: ./ conf/ ivy/ src/bin/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/hostdb/
Author: markus
Date: Thu Jan 21 13:58:56 2016
New Revision: 1725952
URL: http://svn.apache.org/viewvc?rev=1725952&view=rev
Log:
NUTCH-1325 HostDB for Nutch
Added:
nutch/trunk/src/java/org/apache/nutch/hostdb/
nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java
nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java
nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java
nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/conf/log4j.properties
nutch/trunk/conf/nutch-default.xml
nutch/trunk/ivy/ivy.xml
nutch/trunk/src/bin/nutch
nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Thu Jan 21 13:58:56 2016
@@ -1,5 +1,7 @@
Nutch Change Log
+* NUTCH-1325 HostDB for Nutch (Gui Forget, markus, tejasp)
+
* NUTCH-2203 Suffix URL filter can't handle trailing/leading whitespaces (Jurian Broertjes via markus)
* NUTCH-2194 Run IndexingFilterChecker as simple Telnet server (markus)
Modified: nutch/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/nutch/trunk/conf/log4j.properties?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/conf/log4j.properties (original)
+++ nutch/trunk/conf/log4j.properties Thu Jan 21 13:58:56 2016
@@ -57,6 +57,8 @@ log4j.logger.org.apache.nutch.segment.Se
log4j.logger.org.apache.nutch.segment.SegmentReader=INFO,cmdstdout
log4j.logger.org.apache.nutch.tools.FreeGenerator=INFO,cmdstdout
log4j.logger.org.apache.nutch.util.domain.DomainStatistics=INFO,cmdstdout
+log4j.logger.org.apache.nutch.hostdb.UpdateHostDb=INFO,cmdstdout
+log4j.logger.org.apache.nutch.hostdb.ReadHostDb=INFO,cmdstdout
log4j.logger.org.apache.nutch=INFO
log4j.logger.org.apache.hadoop=WARN
Modified: nutch/trunk/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Thu Jan 21 13:58:56 2016
@@ -1951,4 +1951,112 @@ CAUTION: Set the parser.timeout to -1 or
</description>
</property>
+<!-- HostDB settings -->
+<property>
+ <name>hostdb.recheck.interval</name>
+ <value>86400000</value>
+ <description>
+ Interval between rechecks in milliseconds. Default is one week. Recheck
+ interval is multiplied by the number of DNS lookup failures for a given
+ host.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.purge.failed.hosts.threshold</name>
+ <value>3</value>
+ <description>
+ If hosts have more failed DNS lookups than this threshold, they are
+ removed from the HostDB. Hosts can, of course, return if they are still
+ present in the CrawlDB.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.num.resolvers.threads</name>
+ <value>25</value>
+ <description>
+ Number of resolver threads per reducer. Make sure your DNS resolver is
+ capable of handling this value multiplied by the number of reducers.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.check.failed</name>
+ <value>true</value>
+ <description>
+ True if hosts for which DNS lookup failed are eligible for recheck. If
+ false, hosts that failed DNS lookup more than 0 times are not eligible
+ for DNS lookup.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.check.new</name>
+ <value>true</value>
+ <description>
+ True if newly discovered hosts eligible for DNS lookup check. If false,
+ hosts that are just added to the HostDB are not eligible for DNS lookup.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.check.known</name>
+ <value>true</value>
+ <description>
+ True if newly already known hosts eligible for DNS lookup check. If false,
+ known hosts are not eligible for DNS lookup.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.force.check</name>
+ <value>false</value>
+ <description>
+ If true hosts are checked regardless of their respective recheck
+ intervals or status.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.url.filter</name>
+ <value>false</value>
+ <description>
+ Whether the records are to be passed through configured filters.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.url.normalize</name>
+ <value>false</value>
+ <description>
+ Whether the records are to be passed through configured normalizers.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.numeric.fields</name>
+ <value>_rs_</value>
+ <description>
+ Comma-separated list of CrawlDatum metadata fields for which aggregations are needed.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.string.fields</name>
+ <value>Content-Type</value>
+ <description>
+ Comma-separated list of CrawlDatum metadata fields for which sums are needed.
+ </description>
+</property>
+
+<property>
+ <name>hostdb.percentiles</name>
+ <value>50,75,95,99</value>
+ <description>
+ Comma-separated list of percentiles that must be calculated for all numeric
+ field aggregations. Host metadata will contain fields for each percentile.
+ </description>
+</property>
+
</configuration>
Modified: nutch/trunk/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Thu Jan 21 13:58:56 2016
@@ -49,6 +49,7 @@
<dependency org="commons-codec" name="commons-codec" rev="1.10" conf="*->default" />
<dependency org="org.apache.commons" name="commons-compress" rev="1.9" conf="*->default" />
<dependency org="org.apache.commons" name="commons-jexl" rev="2.1.1" />
+ <dependency org="com.tdunning" name="t-digest" rev="3.1" />
<!-- Hadoop Dependencies -->
<dependency org="org.apache.hadoop" name="hadoop-common" rev="2.4.0" conf="*->default">
Modified: nutch/trunk/src/bin/nutch
URL: http://svn.apache.org/viewvc/nutch/trunk/src/bin/nutch?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/src/bin/nutch (original)
+++ nutch/trunk/src/bin/nutch Thu Jan 21 13:58:56 2016
@@ -90,7 +90,9 @@ if [ $# = 0 ]; then
echo " junit runs the given JUnit test"
echo " startserver runs the Nutch Server on localhost:8081"
echo " webapp run a local Nutch Web Application on locahost:8080"
- echo " warc exports crawled data from segments at the WARC format"
+ echo " warc exports crawled data from segments at the WARC format"
+ echo " updatehostdb update the host db with records from the crawl db"
+ echo " readhostdb read / dump host db"
echo " or"
echo " CLASSNAME run the class named CLASSNAME"
echo "Most commands print help when invoked w/o parameters."
@@ -290,6 +292,10 @@ elif [ "$COMMAND" = "webapp" ] ; then
CLASS=org.apache.nutch.webui.NutchUiServer
elif [ "$COMMAND" = "warc" ] ; then
CLASS=org.apache.nutch.tools.warc.WARCExporter
+elif [ "$COMMAND" = "updatehostdb" ] ; then
+ CLASS=org.apache.nutch.hostdb.UpdateHostDb
+elif [ "$COMMAND" = "readhostdb" ] ; then
+ CLASS=org.apache.nutch.hostdb.ReadHostDb
else
CLASS=$COMMAND
fi
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java Thu Jan 21 13:58:56 2016
@@ -47,7 +47,8 @@ public class NutchWritable extends Gener
org.apache.nutch.parse.ParseStatus.class,
org.apache.nutch.protocol.Content.class,
org.apache.nutch.protocol.ProtocolStatus.class,
- org.apache.nutch.scoring.webgraph.LinkDatum.class };
+ org.apache.nutch.scoring.webgraph.LinkDatum.class,
+ org.apache.nutch.hostdb.HostDatum.class };
}
public NutchWritable() {
Added: nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ */
+public class HostDatum implements Writable, Cloneable {
+ protected int failures = 0;
+ protected float score = 0;
+ protected Date lastCheck = new Date(0);
+ protected String homepageUrl = new String();
+
+ protected MapWritable metaData = new MapWritable();
+
+ // Records the number of times DNS look-up failed, may indicate host no longer exists
+ protected int dnsFailures = 0;
+
+ // Records the number of connection failures, may indicate our netwerk being blocked by firewall
+ protected int connectionFailures = 0;
+
+ protected int unfetched = 0;
+ protected int fetched = 0;
+ protected int notModified = 0;
+ protected int redirTemp = 0;
+ protected int redirPerm = 0;
+ protected int gone = 0;
+
+ public HostDatum() {
+ }
+
+ public HostDatum(float score) {
+ this(score, new Date());
+ }
+
+ public HostDatum(float score, Date lastCheck) {
+ this(score, lastCheck, new String());
+ }
+
+ public HostDatum(float score, Date lastCheck, String homepageUrl) {
+ this.score = score;
+ this.lastCheck = lastCheck;
+ this.homepageUrl = homepageUrl;
+ }
+
+ public void resetFailures() {
+ setDnsFailures(0);
+ setConnectionFailures(0);
+ }
+
+ public void setDnsFailures(Integer dnsFailures) {
+ this.dnsFailures = dnsFailures;
+ }
+
+ public void setConnectionFailures(Integer connectionFailures) {
+ this.connectionFailures = connectionFailures;
+ }
+
+ public void incDnsFailures() {
+ this.dnsFailures++;
+ }
+
+ public void incConnectionFailures() {
+ this.connectionFailures++;
+ }
+
+ public Integer numFailures() {
+ return getDnsFailures() + getConnectionFailures();
+ }
+
+ public Integer getDnsFailures() {
+ return dnsFailures;
+ }
+
+ public Integer getConnectionFailures() {
+ return connectionFailures;
+ }
+
+ public void setScore(float score) {
+ this.score = score;
+ }
+
+ public void setLastCheck() {
+ setLastCheck(new Date());
+ }
+
+ public void setLastCheck(Date date) {
+ lastCheck = date;
+ }
+
+ public boolean isEmpty() {
+ return (lastCheck.getTime() == 0) ? true : false;
+ }
+
+ public float getScore() {
+ return score;
+ }
+
+ public Integer numRecords() {
+ return unfetched + fetched + gone + redirPerm + redirTemp + notModified;
+ }
+
+ public Date getLastCheck() {
+ return lastCheck;
+ }
+
+ public boolean hasHomepageUrl() {
+ return homepageUrl.length() > 0;
+ }
+
+ public String getHomepageUrl() {
+ return homepageUrl;
+ }
+
+ public void setHomepageUrl(String homepageUrl) {
+ this.homepageUrl = homepageUrl;
+ }
+
+ public void setUnfetched(int val) {
+ unfetched = val;
+ }
+
+ public int getUnfetched() {
+ return unfetched;
+ }
+
+ public void setFetched(int val) {
+ fetched = val;
+ }
+
+ public int getFetched() {
+ return fetched;
+ }
+
+ public void setNotModified(int val) {
+ notModified = val;
+ }
+
+ public int getNotModified() {
+ return notModified;
+ }
+
+ public void setRedirTemp(int val) {
+ redirTemp = val;
+ }
+
+ public int getRedirTemp() {
+ return redirTemp;
+ }
+
+ public void setRedirPerm(int val) {
+ redirPerm = val;
+ }
+
+ public int getRedirPerm() {
+ return redirPerm;
+ }
+
+ public void setGone(int val) {
+ gone = val;
+ }
+
+ public int getGone() {
+ return gone;
+ }
+
+ public void resetStatistics() {
+ setUnfetched(0);
+ setFetched(0);
+ setGone(0);
+ setRedirTemp(0);
+ setRedirPerm(0);
+ setNotModified(0);
+ }
+
+ public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) {
+ this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable);
+ }
+
+ /**
+ * Add all metadata from other CrawlDatum to this CrawlDatum.
+ *
+ * @param other HostDatum
+ */
+ public void putAllMetaData(HostDatum other) {
+ for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) {
+ getMetaData().put(e.getKey(), e.getValue());
+ }
+ }
+
+ /**
+ * returns a MapWritable if it was set or read in @see readFields(DataInput),
+ * returns empty map in case CrawlDatum was freshly created (lazily instantiated).
+ */
+ public org.apache.hadoop.io.MapWritable getMetaData() {
+ if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable();
+ return this.metaData;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ HostDatum result = (HostDatum)super.clone();
+ result.score = score;
+ result.lastCheck = lastCheck;
+ result.homepageUrl = homepageUrl;
+
+ result.dnsFailures = dnsFailures;
+ result.connectionFailures = connectionFailures;
+
+ result.unfetched = unfetched;
+ result.fetched = fetched;
+ result.notModified = notModified;
+ result.redirTemp = redirTemp;
+ result.redirPerm = redirPerm;
+ result.gone = gone;
+
+ result.metaData = metaData;
+
+ return result;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ score = in.readFloat();
+ lastCheck = new Date(in.readLong());
+ homepageUrl = Text.readString(in);
+
+ dnsFailures = in.readInt();
+ connectionFailures = in.readInt();
+
+ unfetched= in.readInt();
+ fetched= in.readInt();
+ notModified= in.readInt();
+ redirTemp= in.readInt();
+ redirPerm = in.readInt();
+ gone = in.readInt();
+
+ metaData = new org.apache.hadoop.io.MapWritable();
+ metaData.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(score);
+ out.writeLong(lastCheck.getTime());
+ Text.writeString(out, homepageUrl);
+
+ out.writeInt(dnsFailures);
+ out.writeInt(connectionFailures);
+
+ out.writeInt(unfetched);
+ out.writeInt(fetched);
+ out.writeInt(notModified);
+ out.writeInt(redirTemp);
+ out.writeInt(redirPerm);
+ out.writeInt(gone);
+
+ metaData.write(out);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(Integer.toString(getUnfetched()));
+ buf.append("\t");
+ buf.append(Integer.toString(getFetched()));
+ buf.append("\t");
+ buf.append(Integer.toString(getGone()));
+ buf.append("\t");
+ buf.append(Integer.toString(getRedirTemp()));
+ buf.append("\t");
+ buf.append(Integer.toString(getRedirPerm()));
+ buf.append("\t");
+ buf.append(Integer.toString(getNotModified()));
+ buf.append("\t");
+ buf.append(Integer.toString(numRecords()));
+ buf.append("\t");
+ buf.append(Integer.toString(getDnsFailures()));
+ buf.append("\t");
+ buf.append(Integer.toString(getConnectionFailures()));
+ buf.append("\t");
+ buf.append(Integer.toString(numFailures()));
+ buf.append("\t");
+ buf.append(Float.toString(score));
+ buf.append("\t");
+ buf.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck));
+ buf.append("\t");
+ buf.append(homepageUrl);
+ buf.append("\t");
+ for (Entry<Writable, Writable> e : getMetaData().entrySet()) {
+ buf.append(e.getKey().toString());
+ buf.append(':');
+ buf.append(e.getValue().toString());
+ buf.append("|||");
+ }
+ return buf.toString();
+ }
+}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+
+/**
+ * @see http://commons.apache.org/proper/commons-jexl/reference/syntax.html
+ */
+public class ReadHostDb extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ReadHostDb.class);
+
+ public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames";
+ public static final String HOSTDB_DUMP_HOMEPAGES = "hostdb.dump.homepages";
+ public static final String HOSTDB_FILTER_EXPRESSION = "hostdb.filter.expression";
+
+ static class ReadHostDbMapper extends Mapper<Text, HostDatum, Text, Text> {
+ protected boolean dumpHostnames = false;
+ protected boolean dumpHomepages = false;
+ protected Text emptyText = new Text();
+ protected Expression expr = null;
+
+ public void setup(Context context) {
+ dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false);
+ dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false);
+ String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION);
+ if (expr != null) {
+ // Create or retrieve a JexlEngine
+ JexlEngine jexl = new JexlEngine();
+
+ // Dont't be silent and be strict
+ jexl.setSilent(true);
+ jexl.setStrict(true);
+
+ // Create an expression object
+ this.expr = jexl.createExpression(expr);
+ }
+ }
+
+ public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException {
+ if (expr != null) {
+ // Create a context and add data
+ JexlContext jcontext = new MapContext();
+
+ // Set some fixed variables
+ jcontext.set("unfetched", datum.getUnfetched());
+ jcontext.set("fetched", datum.getFetched());
+ jcontext.set("gone", datum.getGone());
+ jcontext.set("redirTemp", datum.getRedirTemp());
+ jcontext.set("redirPerm", datum.getRedirPerm());
+ jcontext.set("redirs", datum.getRedirPerm() + datum.getRedirTemp());
+ jcontext.set("notModified", datum.getNotModified());
+ jcontext.set("ok", datum.getFetched() + datum.getNotModified());
+ jcontext.set("numRecords", datum.numRecords());
+ jcontext.set("dnsFailures", datum.getDnsFailures());
+ jcontext.set("connectionFailures", datum.getConnectionFailures());
+
+ // Set metadata variables
+ for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
+ Object value = entry.getValue();
+
+ if (value instanceof FloatWritable) {
+ FloatWritable fvalue = (FloatWritable)value;
+ Text tkey = (Text)entry.getKey();
+ jcontext.set(tkey.toString(), fvalue.get());
+ }
+
+ if (value instanceof IntWritable) {
+ IntWritable ivalue = (IntWritable)value;
+ Text tkey = (Text)entry.getKey();
+ jcontext.set(tkey.toString(), ivalue.get());
+ }
+ }
+
+ // Filter this record if evaluation did not pass
+ try {
+ if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) {
+ return;
+ }
+ } catch (Exception e) {
+ LOG.info(e.toString() + " for " + key.toString());
+ }
+ }
+
+ if (dumpHomepages) {
+ if (datum.hasHomepageUrl()) {
+ context.write(new Text(datum.getHomepageUrl()), emptyText);
+ }
+ return;
+ }
+
+ if (dumpHostnames) {
+ context.write(key, emptyText);
+ return;
+ }
+
+ // Write anyway
+ context.write(key, new Text(datum.toString()));
+ }
+ }
+
+ // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration
+ // host_a.example.org,host_a.example.org ==> example.org
+// static class ReadHostDbReducer extends Reduce<Text, Text, Text, Text> {
+// public void setup(Context context) { }
+//
+// public void reduce(Text domain, Iterable<Text> hosts, Context context) throws IOException, InterruptedException {
+//
+// }
+// }
+
+ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("ReadHostDb: starting at " + sdf.format(start));
+
+ Configuration conf = getConf();
+ conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages);
+ conf.setBoolean(HOSTDB_DUMP_HOSTNAMES, dumpHostnames);
+ if (expr != null) {
+ conf.set(HOSTDB_FILTER_EXPRESSION, expr);
+ }
+ conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ conf.set("mapred.textoutputformat.separator", "\t");
+
+ Job job = new Job(conf, "ReadHostDb");
+ job.setJarByClass(ReadHostDb.class);
+
+ FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setJarByClass(ReadHostDb.class);
+ job.setMapperClass(ReadHostDbMapper.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
+
+ long end = System.currentTimeMillis();
+ LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: ReadHostDb <hostdb> <output> [-dumpHomepages | -dumpHostnames | -expr <expr.>]");
+ return -1;
+ }
+
+ boolean dumpHomepages = false;
+ boolean dumpHostnames = false;
+ String expr = null;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-dumpHomepages")) {
+ LOG.info("ReadHostDb: dumping homepage URL's");
+ dumpHomepages = true;
+ }
+ if (args[i].equals("-dumpHostnames")) {
+ LOG.info("ReadHostDb: dumping hostnames");
+ dumpHostnames = true;
+ }
+ if (args[i].equals("-expr")) {
+ expr = args[i + 1];
+ LOG.info("ReadHostDb: evaluating expression: " + expr);
+ i++;
+ }
+ }
+
+ try {
+ readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("ReadHostDb: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple runnable that performs DNS lookup for a single host.
+ */
+public class ResolverThread implements Runnable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ResolverThread.class);
+
+ protected String host = null;
+ protected HostDatum datum = null;
+ protected Text hostText = new Text();
+ protected OutputCollector<Text,HostDatum> output;
+ protected Reporter reporter;
+ protected int purgeFailedHostsThreshold;
+
+ /**
+ * Constructor.
+ */
+ public ResolverThread(String host, HostDatum datum,
+ OutputCollector<Text,HostDatum> output, Reporter reporter, int purgeFailedHostsThreshold) {
+
+ hostText.set(host);
+ this.host = host;
+ this.datum = datum;
+ this.output = output;
+ this.reporter = reporter;
+ this.purgeFailedHostsThreshold = purgeFailedHostsThreshold;
+ }
+
+ /**
+ *
+ */
+ public void run() {
+ // Resolve the host and act appropriatly
+ try {
+ // Throws an exception if host is not found
+ InetAddress inetAddr = InetAddress.getByName(host);
+
+ if (datum.isEmpty()) {
+ reporter.incrCounter("UpdateHostDb", "new_known_host" ,1);
+ datum.setLastCheck();
+ LOG.info(host + ": new_known_host " + datum);
+ } else if (datum.getDnsFailures() > 0) {
+ reporter.incrCounter("UpdateHostDb", "rediscovered_host" ,1);
+ datum.setLastCheck();
+ datum.setDnsFailures(0);
+ LOG.info(host + ": rediscovered_host " + datum);
+ } else {
+ reporter.incrCounter("UpdateHostDb", "existing_known_host", 1);
+ datum.setLastCheck();
+ LOG.info(host + ": existing_known_host " + datum);
+ }
+
+ // Write the host datum
+ output.collect(hostText, datum);
+ } catch (UnknownHostException e) {
+ try {
+ // If the counter is empty we'll initialize with date = today and 1 failure
+ if (datum.isEmpty()) {
+ datum.setLastCheck();
+ datum.setDnsFailures(1);
+ output.collect(hostText, datum);
+ reporter.incrCounter("UpdateHostDb", "new_unknown_host", 1);
+ LOG.info(host + ": new_unknown_host " + datum);
+ } else {
+ datum.setLastCheck();
+ datum.incDnsFailures();
+
+ // Check if this host should be forgotten
+ if (purgeFailedHostsThreshold == -1 ||
+ purgeFailedHostsThreshold < datum.getDnsFailures()) {
+
+ output.collect(hostText, datum);
+ reporter.incrCounter("UpdateHostDb", "existing_unknown_host" ,1);
+ LOG.info(host + ": existing_unknown_host " + datum);
+ } else {
+ reporter.incrCounter("UpdateHostDb", "purged_unknown_host" ,1);
+ LOG.info(host + ": purged_unknown_host " + datum);
+ }
+ }
+
+ reporter.incrCounter("UpdateHostDb",
+ Integer.toString(datum.numFailures()) + "_times_failed", 1);
+ } catch (Exception ioe) {
+ LOG.warn(StringUtils.stringifyException(ioe));
+ }
+ } catch (Exception e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+
+ reporter.incrCounter("UpdateHostDb", "checked_hosts", 1);
+ }
+}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values
+ * by host and checks DNS entries for hosts.
+ */
+public class UpdateHostDb extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDb.class);
+ public static final String LOCK_NAME = ".locked";
+
+ public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold";
+ public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads";
+ public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval";
+ public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed";
+ public static final String HOSTDB_CHECK_NEW = "hostdb.check.new";
+ public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known";
+ public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check";
+ public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter";
+ public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize";
+ public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields";
+ public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields";
+ public static final String HOSTDB_PERCENTILES = "hostdb.percentiles";
+
+ private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts,
+ boolean checkFailed, boolean checkNew, boolean checkKnown,
+ boolean force, boolean filter, boolean normalize) throws Exception {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("UpdateHostDb: starting at " + sdf.format(start));
+
+ JobConf job = new NutchJob(getConf());
+ boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+ job.setJarByClass(UpdateHostDb.class);
+ job.setJobName("UpdateHostDb");
+
+ // Check whether the urlfilter-domainblacklist plugin is loaded
+ if (filter && new String("urlfilter-domainblacklist").matches(job.get("plugin.includes"))) {
+ throw new Exception("domainblacklist-urlfilter must not be enabled");
+ }
+
+ // Check whether the urlnormalizer-host plugin is loaded
+ if (normalize && new String("urlnormalizer-host").matches(job.get("plugin.includes"))) {
+ throw new Exception("urlnormalizer-host must not be enabled");
+ }
+
+ FileSystem fs = FileSystem.get(job);
+ Path old = new Path(hostDb, "old");
+ Path current = new Path(hostDb, "current");
+ Path tempHostDb = new Path(hostDb, "hostdb-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ // lock an existing hostdb to prevent multiple simultaneous updates
+ Path lock = new Path(hostDb, LOCK_NAME);
+ if (!fs.exists(current)) {
+ fs.mkdirs(current);
+ }
+ LockUtil.createLockFile(fs, lock, false);
+
+ MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+
+ if (topHosts != null) {
+ MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class);
+ }
+ if (crawlDb != null) {
+ // Tell the job we read from CrawlDB
+ job.setBoolean("hostdb.reading.crawldb", true);
+ MultipleInputs.addInputPath(job, new Path(crawlDb,
+ CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class);
+ }
+
+ FileOutputFormat.setOutputPath(job, tempHostDb);
+
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NutchWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(HostDatum.class);
+ job.setMapperClass(UpdateHostDbMapper.class);
+ job.setReducerClass(UpdateHostDbReducer.class);
+
+ job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ job.setSpeculativeExecution(false);
+ job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed);
+ job.setBoolean(HOSTDB_CHECK_NEW, checkNew);
+ job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown);
+ job.setBoolean(HOSTDB_FORCE_CHECK, force);
+ job.setBoolean(HOSTDB_URL_FILTERING, filter);
+ job.setBoolean(HOSTDB_URL_NORMALIZING, normalize);
+ job.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+ try {
+ JobClient.runJob(job);
+
+ FSUtils.replace(fs, old, current, true);
+ FSUtils.replace(fs, current, tempHostDb, true);
+
+ if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
+ } catch (Exception e) {
+ if (fs.exists(tempHostDb)) {
+ fs.delete(tempHostDb, true);
+ }
+ LockUtil.removeLockFile(fs, lock);
+ throw e;
+ }
+
+ LockUtil.removeLockFile(fs, lock);
+ long end = System.currentTimeMillis();
+ LOG.info("UpdateHostDb: finished at " + sdf.format(end) +
+ ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new UpdateHostDb(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " +
+ "[-tophosts <tophosts>] [-crawldb <crawldb>] [-checkAll] [-checkFailed]" +
+ " [-checkNew] [-checkKnown] [-force] [-filter] [-normalize]");
+ return -1;
+ }
+
+ Path hostDb = null;
+ Path crawlDb = null;
+ Path topHosts = null;
+
+ boolean checkFailed = false;
+ boolean checkNew = false;
+ boolean checkKnown = false;
+ boolean force = false;
+
+ boolean filter = false;
+ boolean normalize = false;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-hostdb")) {
+ hostDb = new Path(args[i + 1]);
+ LOG.info("UpdateHostDb: hostdb: " + hostDb);
+ i++;
+ }
+ if (args[i].equals("-crawldb")) {
+ crawlDb = new Path(args[i + 1]);
+ LOG.info("UpdateHostDb: crawldb: " + crawlDb);
+ i++;
+ }
+ if (args[i].equals("-tophosts")) {
+ topHosts = new Path(args[i + 1]);
+ LOG.info("UpdateHostDb: tophosts: " + topHosts);
+ i++;
+ }
+
+ if (args[i].equals("-checkFailed")) {
+ LOG.info("UpdateHostDb: checking failed hosts");
+ checkFailed = true;
+ }
+ if (args[i].equals("-checkNew")) {
+ LOG.info("UpdateHostDb: checking new hosts");
+ checkNew = true;
+ }
+ if (args[i].equals("-checkKnown")) {
+ LOG.info("UpdateHostDb: checking known hosts");
+ checkKnown = true;
+ }
+ if (args[i].equals("-checkAll")) {
+ LOG.info("UpdateHostDb: checking all hosts");
+ checkFailed = true;
+ checkNew = true;
+ checkKnown = true;
+ }
+ if (args[i].equals("-force")) {
+ LOG.info("UpdateHostDb: forced check");
+ force = true;
+ }
+ if (args[i].equals("-filter")) {
+ LOG.info("UpdateHostDb: filtering enabled");
+ filter = true;
+ }
+ if (args[i].equals("-normalize")) {
+ LOG.info("UpdateHostDb: normalizing enabled");
+ normalize = true;
+ }
+ }
+
+ if (hostDb == null) {
+ System.err.println("hostDb is mandatory");
+ return -1;
+ }
+
+ try {
+ updateHostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew,
+ checkKnown, force, filter, normalize);
+
+ return 0;
+ } catch (Exception e) {
+ LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.URLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read
+ * host score info from a plain text key/value file generated by the
+ * Webgraph's NodeDumper tool.
+ */
+public class UpdateHostDbMapper
+ implements Mapper<Text, Writable, Text, NutchWritable> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbMapper.class);
+ protected Text host = new Text();
+ protected HostDatum hostDatum = null;
+ protected CrawlDatum crawlDatum = null;
+ protected String reprUrl = null;
+ protected String buffer = null;
+ protected String[] args = null;
+ protected boolean filter = false;
+ protected boolean normalize = false;
+ protected boolean readingCrawlDb = false;
+ protected URLFilters filters = null;
+ protected URLNormalizers normalizers = null;
+
+ public void close() {}
+
+ /**
+ * @param JobConf
+ * @return void
+ */
+ public void configure(JobConf job) {
+ readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false);
+ filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false);
+ normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false);
+
+ if (filter)
+ filters = new URLFilters(job);
+ if (normalize)
+ normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT);
+ }
+
+ /**
+ * Filters and or normalizes the input URL
+ *
+ * @param String
+ * @return String
+ */
+ protected String filterNormalize(String url) {
+ // We actually receive a hostname here so let's make a URL
+ // TODO: we force shop.fcgroningen to be https, how do we know that here?
+ // http://issues.openindex.io/browse/SPIDER-40
+ url = "http://" + url + "/";
+
+ try {
+ if (normalize)
+ url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+ if (filter)
+ url = filters.filter(url);
+ if (url == null)
+ return null;
+ } catch (Exception e) {
+ return null;
+ }
+
+ // Turn back to host
+ return URLUtil.getHost(url);
+ }
+
+ /**
+ * Mapper ingesting records from the HostDB, CrawlDB and plaintext host
+ * scores file. Statistics and scores are passed on.
+ *
+ * @param Text key
+ * @param Writable value
+ * @param OutputCollector<Text,NutchWritable> output
+ * @param Reporter reporter
+ * @return void
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text,NutchWritable> output, Reporter reporter)
+ throws IOException {
+
+ // Get the key!
+ String keyStr = key.toString();
+
+ // Check if we process records from the CrawlDB
+ if (key instanceof Text && value instanceof CrawlDatum) {
+ // Get the normalized and filtered host of this URL
+ buffer = filterNormalize(URLUtil.getHost(keyStr));
+
+ // Filtered out?
+ if (buffer == null) {
+ reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+ LOG.info("UpdateHostDb: " + URLUtil.getHost(keyStr) + " crawldatum has been filtered");
+ return;
+ }
+
+ // Set the host of this URL
+ host.set(buffer);
+ crawlDatum = (CrawlDatum)value;
+ hostDatum = new HostDatum();
+
+ /**
+ * TODO: fix multi redirects: host_a => host_b/page => host_c/page/whatever
+ * http://www.ferienwohnung-armbruster.de/
+ * http://www.ferienwohnung-armbruster.de/website/
+ * http://www.ferienwohnung-armbruster.de/website/willkommen.php
+ *
+ * We cannot reresolve redirects for host objects as CrawlDatum metadata is
+ * not available. We also cannot reliably use the reducer in all cases
+ * since redirects may be across hosts or even domains. The example
+ * above has redirects that will end up in the same reducer. During that
+ * phase, however, we do not know which URL redirects to the next URL.
+ */
+ // Do not resolve homepages when the root URL is unfetched
+ if (crawlDatum.getStatus() != CrawlDatum.STATUS_DB_UNFETCHED) {
+ // Get the protocol
+ String protocol = URLUtil.getProtocol(keyStr);
+
+ // Get the proposed homepage URL
+ String homepage = protocol + "://" + buffer + "/";
+
+ // Check if the current key is equals the host
+ if (keyStr.equals(homepage)) {
+ // Check if this is a redirect to the real home page
+ if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
+ crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+ // Obtain the repr url for this redirect via protocolstatus from the metadata
+ ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData().
+ get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+
+ // Get the protocol status' arguments
+ args = z.getArgs();
+
+ // ..and the possible redirect URL
+ reprUrl = args[0];
+
+ // Am i a redirect?
+ if (reprUrl != null) {
+ LOG.info("UpdateHostDb: homepage: " + keyStr + " redirects to: " + args[0]);
+ output.collect(host, new NutchWritable(hostDatum));
+ hostDatum.setHomepageUrl(reprUrl);
+ } else {
+ LOG.info("UpdateHostDb: homepage: " + keyStr +
+ " redirects to: " + args[0] + " but has been filtered out");
+ }
+ } else {
+ hostDatum.setHomepageUrl(homepage);
+ output.collect(host, new NutchWritable(hostDatum));
+ LOG.info("UpdateHostDb: homepage: " + homepage);
+ }
+ }
+ }
+
+ // Always emit crawl datum
+ output.collect(host, new NutchWritable(crawlDatum));
+ }
+
+ // Check if we got a record from the hostdb
+ if (key instanceof Text && value instanceof HostDatum) {
+ buffer = filterNormalize(keyStr);
+
+ // Filtered out?
+ if (buffer == null) {
+ reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+ LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered");
+ return;
+ }
+
+ // Get a HostDatum
+ hostDatum = (HostDatum)value;
+ key.set(buffer);
+
+ // If we're also reading CrawlDb entries, reset db_* statistics because
+ // we're aggregating them from CrawlDB anyway
+ if (readingCrawlDb) {
+ hostDatum.resetStatistics();
+ }
+
+ output.collect(key, new NutchWritable(hostDatum));
+ }
+
+ // Check if we got a record with host scores
+ if (key instanceof Text && value instanceof Text) {
+ buffer = filterNormalize(keyStr);
+
+ // Filtered out?
+ if (buffer == null) {
+ reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+ LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered");
+ return;
+ }
+
+ key.set(buffer);
+
+ output.collect(key,
+ new NutchWritable(new FloatWritable(Float.parseFloat(value.toString()))));
+ }
+ }
+}
\ No newline at end of file
Added: nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+
+import com.tdunning.math.stats.TDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class UpdateHostDbReducer
+ implements Reducer<Text, NutchWritable, Text, HostDatum> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbReducer.class);
+ protected ResolverThread resolverThread = null;
+ protected Integer numResolverThreads = 10;
+ protected static Integer purgeFailedHostsThreshold = -1;
+ protected static Integer recheckInterval = 86400000;
+ protected static boolean checkFailed = false;
+ protected static boolean checkNew = false;
+ protected static boolean checkKnown = false;
+ protected static boolean force = false;
+ protected static long now = new Date().getTime();
+ protected static String[] numericFields;
+ protected static String[] stringFields;
+ protected static int[] percentiles;
+ protected static Text[] numericFieldWritables;
+ protected static Text[] stringFieldWritables;
+
+ protected BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ protected ThreadPoolExecutor executor = null;
+
+ /**
+ * Configures the thread pool and prestarts all resolver threads.
+ *
+ * @param JobConf
+ */
+ public void configure(JobConf job) {
+ purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1);
+ numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10);
+ recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000;
+ checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false);
+ checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false);
+ checkKnown = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_KNOWN, false);
+ force = job.getBoolean(UpdateHostDb.HOSTDB_FORCE_CHECK, false);
+ numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS);
+ stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS);
+ percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES);
+
+ // What fields do we need to collect metadata from
+ if (numericFields != null) {
+ numericFieldWritables = new Text[numericFields.length];
+ for (int i = 0; i < numericFields.length; i++) {
+ numericFieldWritables[i] = new Text(numericFields[i]);
+ }
+ }
+
+ if (stringFields != null) {
+ stringFieldWritables = new Text[stringFields.length];
+ for (int i = 0; i < stringFields.length; i++) {
+ stringFieldWritables[i] = new Text(stringFields[i]);
+ }
+ }
+
+ // Initialize the thread pool with our queue
+ executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads,
+ 5, TimeUnit.SECONDS, queue);
+
+ // Run all threads in the pool
+ executor.prestartAllCoreThreads();
+ }
+
+ /**
+ *
+ */
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException {
+
+ Map<String,Map<String,Integer>> stringCounts = new HashMap<String,Map<String, Integer>>();
+ Map<String,Float> maximums = new HashMap<String,Float>();
+ Map<String,Float> sums = new HashMap<String,Float>(); // used to calc averages
+ Map<String,Integer> counts = new HashMap<String,Integer>(); // used to calc averages
+ Map<String,Float> minimums = new HashMap<String,Float>();
+ Map<String,TDigest> tdigests = new HashMap<String,TDigest>();
+
+ HostDatum hostDatum = new HostDatum();
+ float score = 0;
+
+ if (stringFields != null) {
+ for (int i = 0; i < stringFields.length; i++) {
+ stringCounts.put(stringFields[i], new HashMap<String,Integer>());
+ }
+ }
+
+ // Loop through all values until we find a non-empty HostDatum or use
+ // an empty if this is a new host for the host db
+ while (values.hasNext()) {
+ Writable value = values.next().get();
+
+ // Count crawl datum status's and collect metadata from fields
+ if (value instanceof CrawlDatum) {
+ CrawlDatum buffer = (CrawlDatum)value;
+
+ // Set the correct status field
+ switch (buffer.getStatus()) {
+ case CrawlDatum.STATUS_DB_UNFETCHED:
+ hostDatum.setUnfetched(hostDatum.getUnfetched() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_FETCHED:
+ hostDatum.setFetched(hostDatum.getFetched() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_GONE:
+ hostDatum.setGone(hostDatum.getGone() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_REDIR_TEMP:
+ hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_REDIR_PERM:
+ hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_NOTMODIFIED:
+ hostDatum.setNotModified(hostDatum.getNotModified() + 1);
+ break;
+ }
+
+ // Record connection failures
+ if (buffer.getRetriesSinceFetch() != 0) {
+ hostDatum.incConnectionFailures();
+ }
+
+ // Only gather metadata statistics for proper fetched pages
+ if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+ // Deal with the string fields
+ if (stringFields != null) {
+ for (int i = 0; i < stringFields.length; i++) {
+ // Does this field exist?
+ if (buffer.getMetaData().get(stringFieldWritables[i]) != null) {
+ // Get it!
+ String metadataValue = null;
+ try {
+ metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString();
+ } catch (Exception e) {
+ LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value");
+ }
+
+ // Does the value exist?
+ if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
+ // Yes, increment it
+ stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1);
+ } else {
+ // Create it!
+ stringCounts.get(stringFields[i]).put(metadataValue, 1);
+ }
+ }
+ }
+ }
+
+ // Deal with the numeric fields
+ if (numericFields != null) {
+ for (int i = 0; i < numericFields.length; i++) {
+ // Does this field exist?
+ if (buffer.getMetaData().get(numericFieldWritables[i]) != null) {
+ try {
+ // Get it!
+ Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString());
+
+ // Does the median value exist?
+ if (tdigests.containsKey(numericFields[i])) {
+ tdigests.get(numericFields[i]).add(metadataValue);
+ } else {
+ // Create it!
+ TDigest tdigest = TDigest.createDigest(100);
+ tdigest.add((double)metadataValue);
+ tdigests.put(numericFields[i], tdigest);
+ }
+
+ // Does the minimum value exist?
+ if (minimums.containsKey(numericFields[i])) {
+ // Write if this is lower than existing value
+ if (metadataValue < minimums.get(numericFields[i])) {
+ minimums.put(numericFields[i], metadataValue);
+ }
+ } else {
+ // Create it!
+ minimums.put(numericFields[i], metadataValue);
+ }
+
+ // Does the maximum value exist?
+ if (maximums.containsKey(numericFields[i])) {
+ // Write if this is lower than existing value
+ if (metadataValue > maximums.get(numericFields[i])) {
+ maximums.put(numericFields[i], metadataValue);
+ }
+ } else {
+ // Create it!
+ maximums.put(numericFields[i], metadataValue);
+ }
+
+ // Sum it up!
+ if (sums.containsKey(numericFields[i])) {
+ // Increment
+ sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue);
+ counts.put(numericFields[i], counts.get(numericFields[i]) + 1);
+ } else {
+ // Create it!
+ sums.put(numericFields[i], metadataValue);
+ counts.put(numericFields[i], 1);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage() + " when processing values for " + key.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ //
+ if (value instanceof HostDatum) {
+ HostDatum buffer = (HostDatum)value;
+
+ // Check homepage URL
+ if (buffer.hasHomepageUrl()) {
+ hostDatum.setHomepageUrl(buffer.getHomepageUrl());
+ }
+
+ // Check lastCheck timestamp
+ if (!buffer.isEmpty()) {
+ hostDatum.setLastCheck(buffer.getLastCheck());
+ }
+
+ // Check and set DNS failures
+ if (buffer.getDnsFailures() > 0) {
+ hostDatum.setDnsFailures(buffer.getDnsFailures());
+ }
+
+ // Check and set connection failures
+ if (buffer.getConnectionFailures() > 0) {
+ hostDatum.setConnectionFailures(buffer.getConnectionFailures());
+ }
+
+ // Check metadata
+ if (!buffer.getMetaData().isEmpty()) {
+ hostDatum.setMetaData(buffer.getMetaData());
+ }
+
+ // Check and set score (score from Web Graph has precedence)
+ if (buffer.getScore() > 0) {
+ hostDatum.setScore(buffer.getScore());
+ }
+ }
+
+ // Check for the score
+ if (value instanceof FloatWritable) {
+ FloatWritable buffer = (FloatWritable)value;
+ score = buffer.get();
+ }
+ }
+
+ // Check if score was set from Web Graph
+ if (score > 0) {
+ hostDatum.setScore(score);
+ }
+
+ // Set metadata
+ for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) {
+ for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) {
+ hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue()));
+ }
+ }
+ for (Map.Entry<String, Float> entry : maximums.entrySet()) {
+ hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue()));
+ }
+ for (Map.Entry<String, Float> entry : sums.entrySet()) {
+ hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey())));
+ }
+ for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) {
+ // Emit all percentiles
+ for (int i = 0; i < percentiles.length; i++) {
+ hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
+ }
+ }
+ for (Map.Entry<String, Float> entry : minimums.entrySet()) {
+ hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue()));
+ }
+
+ reporter.incrCounter("UpdateHostDb", "total_hosts", 1);
+
+ // See if this record is to be checked
+ if (shouldCheck(hostDatum)) {
+ // Make an entry
+ resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold);
+
+ // Add the entry to the queue (blocking)
+ try {
+ queue.put(resolverThread);
+ } catch (InterruptedException e) {
+ LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+ }
+
+ // Do not progress, the datum will be written in the resolver thread
+ return;
+ } else {
+ reporter.incrCounter("UpdateHostDb", "skipped_not_eligible", 1);
+ LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible");
+ }
+
+ // Write the host datum if it wasn't written by the resolver thread
+ output.collect(key, hostDatum);
+ }
+
+ /**
+ * Determines whether a record should be checked.
+ *
+ * @param HostDatum
+ * @return boolean
+ */
+ protected boolean shouldCheck(HostDatum datum) {
+ // Whether a new record is to be checked
+ if (checkNew && datum.isEmpty()) {
+ return true;
+ }
+
+ // Whether existing known hosts should be rechecked
+ if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) {
+ return isEligibleForCheck(datum);
+ }
+
+ // Whether failed records are forced to be rechecked
+ if (checkFailed && datum.getDnsFailures() > 0) {
+ return isEligibleForCheck(datum);
+ }
+
+ // It seems this record is not to be checked
+ return false;
+ }
+
+ /**
+ * Determines whether a record is eligible for recheck.
+ *
+ * @param HostDatum
+ * @return boolean
+ */
+ protected boolean isEligibleForCheck(HostDatum datum) {
+ // Whether an existing host, known or unknown, if forced to be rechecked
+ if (force || datum.getLastCheck().getTime() +
+ (recheckInterval * datum.getDnsFailures() + 1) > now) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shut down all running threads and wait for completion.
+ */
+ public void close() {
+ LOG.info("UpdateHostDb: feeder finished, waiting for shutdown");
+
+ // If we're here all keys have been fed and we can issue a shut down
+ executor.shutdown();
+
+ boolean finished = false;
+
+ // Wait until all resolvers have finished
+ while (!finished) {
+ try {
+ // Wait for the executor to shut down completely
+ if (!executor.isTerminated()) {
+ LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize()));
+ Thread.sleep(1000);
+ } else {
+ // All is well, get out
+ finished = true;
+ }
+ } catch (InterruptedException e) {
+ // Huh?
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+ }
+}
\ No newline at end of file