You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/01/21 14:58:56 UTC

svn commit: r1725952 - in /nutch/trunk: ./ conf/ ivy/ src/bin/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/hostdb/

Author: markus
Date: Thu Jan 21 13:58:56 2016
New Revision: 1725952

URL: http://svn.apache.org/viewvc?rev=1725952&view=rev
Log:
NUTCH-1325 HostDB for Nutch

Added:
    nutch/trunk/src/java/org/apache/nutch/hostdb/
    nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java
    nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java
    nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java
    nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
    nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
    nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/conf/log4j.properties
    nutch/trunk/conf/nutch-default.xml
    nutch/trunk/ivy/ivy.xml
    nutch/trunk/src/bin/nutch
    nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Thu Jan 21 13:58:56 2016
@@ -1,5 +1,7 @@
 Nutch Change Log
 
+* NUTCH-1325 HostDB for Nutch (Gui Forget, markus, tejasp)
+
 * NUTCH-2203 Suffix URL filter can't handle trailing/leading whitespaces (Jurian Broertjes via markus)
 
 * NUTCH-2194 Run IndexingFilterChecker as simple Telnet server (markus)

Modified: nutch/trunk/conf/log4j.properties
URL: http://svn.apache.org/viewvc/nutch/trunk/conf/log4j.properties?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/conf/log4j.properties (original)
+++ nutch/trunk/conf/log4j.properties Thu Jan 21 13:58:56 2016
@@ -57,6 +57,8 @@ log4j.logger.org.apache.nutch.segment.Se
 log4j.logger.org.apache.nutch.segment.SegmentReader=INFO,cmdstdout
 log4j.logger.org.apache.nutch.tools.FreeGenerator=INFO,cmdstdout
 log4j.logger.org.apache.nutch.util.domain.DomainStatistics=INFO,cmdstdout
+log4j.logger.org.apache.nutch.hostdb.UpdateHostDb=INFO,cmdstdout
+log4j.logger.org.apache.nutch.hostdb.ReadHostDb=INFO,cmdstdout
 
 log4j.logger.org.apache.nutch=INFO
 log4j.logger.org.apache.hadoop=WARN

Modified: nutch/trunk/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/conf/nutch-default.xml?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/conf/nutch-default.xml (original)
+++ nutch/trunk/conf/nutch-default.xml Thu Jan 21 13:58:56 2016
@@ -1951,4 +1951,112 @@ CAUTION: Set the parser.timeout to -1 or
   </description>
 </property>
 
+<!-- HostDB settings -->
+<property>
+  <name>hostdb.recheck.interval</name>
+  <value>86400000</value>
+  <description>
+    Interval between rechecks in milliseconds. Default is one week. Recheck
+    interval is multiplied by the number of DNS lookup failures for a given
+    host.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.purge.failed.hosts.threshold</name>
+  <value>3</value>
+  <description>
+    If hosts have more failed DNS lookups than this threshold, they are
+    removed from the HostDB. Hosts can, of course, return if they are still
+    present in the CrawlDB.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.num.resolvers.threads</name>
+  <value>25</value>
+  <description>
+    Number of resolver threads per reducer. Make sure your DNS resolver is
+    capable of handling this value multiplied by the number of reducers.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.check.failed</name>
+  <value>true</value>
+  <description>
+    True if hosts for which DNS lookup failed are eligible for recheck. If
+    false, hosts that failed DNS lookup more than 0 times are not eligible
+    for DNS lookup.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.check.new</name>
+  <value>true</value>
+  <description>
+    True if newly discovered hosts eligible for DNS lookup check. If false,
+    hosts that are just added to the HostDB are not eligible for DNS lookup.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.check.known</name>
+  <value>true</value>
+  <description>
+    True if newly already known hosts eligible for DNS lookup check. If false,
+    known hosts are not eligible for DNS lookup.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.force.check</name>
+  <value>false</value>
+  <description>
+    If true hosts are checked regardless of their respective recheck
+    intervals or status.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.url.filter</name>
+  <value>false</value>
+  <description>
+    Whether the records are to be passed through configured filters.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.url.normalize</name>
+  <value>false</value>
+  <description>
+    Whether the records are to be passed through configured normalizers.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.numeric.fields</name>
+  <value>_rs_</value>
+  <description>
+    Comma-separated list of CrawlDatum metadata fields for which aggregations are needed.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.string.fields</name>
+  <value>Content-Type</value>
+  <description>
+    Comma-separated list of CrawlDatum metadata fields for which sums are needed.
+  </description>
+</property>
+
+<property>
+  <name>hostdb.percentiles</name>
+  <value>50,75,95,99</value>
+  <description>
+    Comma-separated list of percentiles that must be calculated for all numeric
+    field aggregations. Host metadata will contain fields for each percentile.
+  </description>
+</property>
+
 </configuration>

Modified: nutch/trunk/ivy/ivy.xml
URL: http://svn.apache.org/viewvc/nutch/trunk/ivy/ivy.xml?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/ivy/ivy.xml (original)
+++ nutch/trunk/ivy/ivy.xml Thu Jan 21 13:58:56 2016
@@ -49,6 +49,7 @@
 		<dependency org="commons-codec" name="commons-codec" rev="1.10" conf="*->default" />
         <dependency org="org.apache.commons" name="commons-compress" rev="1.9" conf="*->default" />
         <dependency org="org.apache.commons" name="commons-jexl" rev="2.1.1" />
+        <dependency org="com.tdunning" name="t-digest" rev="3.1" />
             
         <!-- Hadoop Dependencies -->
 		<dependency org="org.apache.hadoop" name="hadoop-common" rev="2.4.0" conf="*->default">

Modified: nutch/trunk/src/bin/nutch
URL: http://svn.apache.org/viewvc/nutch/trunk/src/bin/nutch?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/src/bin/nutch (original)
+++ nutch/trunk/src/bin/nutch Thu Jan 21 13:58:56 2016
@@ -90,7 +90,9 @@ if [ $# = 0 ]; then
   echo "  junit             runs the given JUnit test"
   echo "  startserver       runs the Nutch Server on localhost:8081"
   echo "  webapp            run a local Nutch Web Application on locahost:8080"
-  echo "  warc	            exports crawled data from segments at the WARC format"
+  echo "  warc              exports crawled data from segments at the WARC format"
+  echo "  updatehostdb      update the host db with records from the crawl db"
+  echo "  readhostdb        read / dump host db"
   echo " or"
   echo "  CLASSNAME         run the class named CLASSNAME"
   echo "Most commands print help when invoked w/o parameters."
@@ -290,6 +292,10 @@ elif [ "$COMMAND" = "webapp" ] ; then
   CLASS=org.apache.nutch.webui.NutchUiServer
 elif [ "$COMMAND" = "warc" ] ; then
   CLASS=org.apache.nutch.tools.warc.WARCExporter
+elif [ "$COMMAND" = "updatehostdb" ] ; then
+  CLASS=org.apache.nutch.hostdb.UpdateHostDb
+elif [ "$COMMAND" = "readhostdb" ] ; then
+  CLASS=org.apache.nutch.hostdb.ReadHostDb
 else
   CLASS=$COMMAND
 fi

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=1725952&r1=1725951&r2=1725952&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/NutchWritable.java Thu Jan 21 13:58:56 2016
@@ -47,7 +47,8 @@ public class NutchWritable extends Gener
         org.apache.nutch.parse.ParseStatus.class,
         org.apache.nutch.protocol.Content.class,
         org.apache.nutch.protocol.ProtocolStatus.class,
-        org.apache.nutch.scoring.webgraph.LinkDatum.class };
+        org.apache.nutch.scoring.webgraph.LinkDatum.class,
+        org.apache.nutch.hostdb.HostDatum.class };
   }
 
   public NutchWritable() {

Added: nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/HostDatum.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ */
+public class HostDatum implements Writable, Cloneable {
+  protected int failures = 0;
+  protected float score = 0;
+  protected Date lastCheck = new Date(0);
+  protected String homepageUrl = new String();
+
+  protected MapWritable metaData = new MapWritable();
+
+  // Records the number of times DNS look-up failed, may indicate host no longer exists
+  protected int dnsFailures = 0;
+
+  // Records the number of connection failures, may indicate our netwerk being blocked by firewall
+  protected int connectionFailures = 0;
+
+  protected int unfetched = 0;
+  protected int fetched = 0;
+  protected int notModified = 0;
+  protected int redirTemp = 0;
+  protected int redirPerm = 0;
+  protected int gone = 0;
+
+  public HostDatum() {
+  }
+
+  public HostDatum(float score) {
+    this(score, new Date());
+  }
+
+  public HostDatum(float score, Date lastCheck) {
+    this(score, lastCheck, new String());
+  }
+
+  public HostDatum(float score, Date lastCheck, String homepageUrl) {
+    this.score =  score;
+    this.lastCheck = lastCheck;
+    this.homepageUrl = homepageUrl;
+  }
+
+  public void resetFailures() {
+    setDnsFailures(0);
+    setConnectionFailures(0);
+  }
+
+  public void setDnsFailures(Integer dnsFailures) {
+    this.dnsFailures = dnsFailures;
+  }
+
+  public void setConnectionFailures(Integer connectionFailures) {
+    this.connectionFailures = connectionFailures;
+  }
+
+  public void incDnsFailures() {
+    this.dnsFailures++;
+  }
+
+  public void incConnectionFailures() {
+    this.connectionFailures++;
+  }
+
+  public Integer numFailures() {
+    return getDnsFailures() + getConnectionFailures();
+  }
+
+  public Integer getDnsFailures() {
+    return dnsFailures;
+  }
+
+  public Integer getConnectionFailures() {
+    return connectionFailures;
+  }
+
+  public void setScore(float score) {
+    this.score = score;
+  }
+
+  public void setLastCheck() {
+    setLastCheck(new Date());
+  }
+
+  public void setLastCheck(Date date) {
+    lastCheck = date;
+  }
+
+  public boolean isEmpty() {
+    return (lastCheck.getTime() == 0) ? true : false;
+  }
+
+  public float getScore() {
+    return score;
+  }
+
+  public Integer numRecords() {
+    return unfetched + fetched + gone + redirPerm + redirTemp + notModified;
+  }
+
+  public Date getLastCheck() {
+    return lastCheck;
+  }
+
+  public boolean hasHomepageUrl() {
+    return homepageUrl.length() > 0;
+  }
+
+  public String getHomepageUrl() {
+    return homepageUrl;
+  }
+
+  public void setHomepageUrl(String homepageUrl) {
+    this.homepageUrl = homepageUrl;
+  }
+
+  public void setUnfetched(int val) {
+    unfetched = val;
+  }
+
+  public int getUnfetched() {
+    return unfetched;
+  }
+
+  public void setFetched(int val) {
+    fetched = val;
+  }
+
+  public int getFetched() {
+    return fetched;
+  }
+
+  public void setNotModified(int val) {
+    notModified = val;
+  }
+
+  public int getNotModified() {
+    return notModified;
+  }
+
+  public void setRedirTemp(int val) {
+    redirTemp = val;
+  }
+
+  public int getRedirTemp() {
+    return redirTemp;
+  }
+
+  public void setRedirPerm(int val) {
+    redirPerm = val;
+  }
+
+  public int getRedirPerm() {
+    return redirPerm;
+  }
+
+  public void setGone(int val) {
+    gone = val;
+  }
+
+  public int getGone() {
+    return gone;
+  }
+
+  public void resetStatistics() {
+    setUnfetched(0);
+    setFetched(0);
+    setGone(0);
+    setRedirTemp(0);
+    setRedirPerm(0);
+    setNotModified(0);
+  }
+
+   public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) {
+     this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable);
+   }
+
+   /**
+    * Add all metadata from other CrawlDatum to this CrawlDatum.
+    *
+    * @param other HostDatum
+    */
+   public void putAllMetaData(HostDatum other) {
+     for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) {
+       getMetaData().put(e.getKey(), e.getValue());
+     }
+   }
+
+  /**
+   * returns a MapWritable if it was set or read in @see readFields(DataInput),
+   * returns empty map in case CrawlDatum was freshly created (lazily instantiated).
+   */
+  public org.apache.hadoop.io.MapWritable getMetaData() {
+    if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable();
+    return this.metaData;
+  }
+
+  @Override
+  public Object clone() throws CloneNotSupportedException {
+    HostDatum result = (HostDatum)super.clone();
+    result.score = score;
+    result.lastCheck = lastCheck;
+    result.homepageUrl = homepageUrl;
+
+    result.dnsFailures = dnsFailures;
+    result.connectionFailures = connectionFailures;
+
+    result.unfetched = unfetched;
+    result.fetched = fetched;
+    result.notModified = notModified;
+    result.redirTemp = redirTemp;
+    result.redirPerm = redirPerm;
+    result.gone = gone;
+
+    result.metaData = metaData;
+
+    return result;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    score = in.readFloat();
+    lastCheck = new Date(in.readLong());
+    homepageUrl = Text.readString(in);
+
+    dnsFailures = in.readInt();
+    connectionFailures = in.readInt();
+
+    unfetched= in.readInt();
+    fetched= in.readInt();
+    notModified= in.readInt();
+    redirTemp= in.readInt();
+    redirPerm = in.readInt();
+    gone = in.readInt();
+
+    metaData = new org.apache.hadoop.io.MapWritable();
+    metaData.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(score);
+    out.writeLong(lastCheck.getTime());
+    Text.writeString(out, homepageUrl);
+
+    out.writeInt(dnsFailures);
+    out.writeInt(connectionFailures);
+
+    out.writeInt(unfetched);
+    out.writeInt(fetched);
+    out.writeInt(notModified);
+    out.writeInt(redirTemp);
+    out.writeInt(redirPerm);
+    out.writeInt(gone);
+
+    metaData.write(out);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(Integer.toString(getUnfetched()));
+    buf.append("\t");
+    buf.append(Integer.toString(getFetched()));
+    buf.append("\t");
+    buf.append(Integer.toString(getGone()));
+    buf.append("\t");
+    buf.append(Integer.toString(getRedirTemp()));
+    buf.append("\t");
+    buf.append(Integer.toString(getRedirPerm()));
+    buf.append("\t");
+    buf.append(Integer.toString(getNotModified()));
+    buf.append("\t");
+    buf.append(Integer.toString(numRecords()));
+    buf.append("\t");
+    buf.append(Integer.toString(getDnsFailures()));
+    buf.append("\t");
+    buf.append(Integer.toString(getConnectionFailures()));
+    buf.append("\t");
+    buf.append(Integer.toString(numFailures()));
+    buf.append("\t");
+    buf.append(Float.toString(score));
+    buf.append("\t");
+    buf.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck));
+    buf.append("\t");
+    buf.append(homepageUrl);
+    buf.append("\t");
+    for (Entry<Writable, Writable> e : getMetaData().entrySet()) {
+      buf.append(e.getKey().toString());
+      buf.append(':');
+      buf.append(e.getValue().toString());
+      buf.append("|||");
+    }
+    return buf.toString();
+  }
+}
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/ReadHostDb.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+
+/**
+ * @see http://commons.apache.org/proper/commons-jexl/reference/syntax.html
+ */
+public class ReadHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ReadHostDb.class);
+
+  public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames";
+  public static final String HOSTDB_DUMP_HOMEPAGES = "hostdb.dump.homepages";
+  public static final String HOSTDB_FILTER_EXPRESSION = "hostdb.filter.expression";
+
+  static class ReadHostDbMapper extends Mapper<Text, HostDatum, Text, Text> {
+    protected boolean dumpHostnames = false;
+    protected boolean dumpHomepages = false;
+    protected Text emptyText = new Text();
+    protected Expression expr = null;
+
+    public void setup(Context context) {
+      dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false);
+      dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false);
+      String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION);
+      if (expr != null) {
+        // Create or retrieve a JexlEngine
+        JexlEngine jexl = new JexlEngine();
+        
+        // Dont't be silent and be strict
+        jexl.setSilent(true);
+        jexl.setStrict(true);
+        
+        // Create an expression object
+        this.expr = jexl.createExpression(expr);
+      }
+    }
+
+    public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException {     
+      if (expr != null) {
+        // Create a context and add data
+        JexlContext jcontext = new MapContext();
+        
+        // Set some fixed variables
+        jcontext.set("unfetched", datum.getUnfetched());
+        jcontext.set("fetched", datum.getFetched());
+        jcontext.set("gone", datum.getGone());
+        jcontext.set("redirTemp", datum.getRedirTemp());
+        jcontext.set("redirPerm", datum.getRedirPerm());
+        jcontext.set("redirs", datum.getRedirPerm() + datum.getRedirTemp());
+        jcontext.set("notModified", datum.getNotModified());
+        jcontext.set("ok", datum.getFetched() + datum.getNotModified());
+        jcontext.set("numRecords", datum.numRecords());
+        jcontext.set("dnsFailures", datum.getDnsFailures());
+        jcontext.set("connectionFailures", datum.getConnectionFailures());
+        
+        // Set metadata variables
+        for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
+          Object value = entry.getValue();
+          
+          if (value instanceof FloatWritable) {
+            FloatWritable fvalue = (FloatWritable)value;
+            Text tkey = (Text)entry.getKey();
+            jcontext.set(tkey.toString(), fvalue.get());
+          }
+          
+          if (value instanceof IntWritable) {
+            IntWritable ivalue = (IntWritable)value;
+            Text tkey = (Text)entry.getKey();
+            jcontext.set(tkey.toString(), ivalue.get());
+          }
+        }
+        
+        // Filter this record if evaluation did not pass
+        try {
+          if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) {
+            return;
+          }
+        } catch (Exception e) {
+          LOG.info(e.toString() + " for " + key.toString());
+        }
+      }
+      
+      if (dumpHomepages) {
+        if (datum.hasHomepageUrl()) {
+          context.write(new Text(datum.getHomepageUrl()), emptyText);
+        }
+        return;
+      }
+      
+      if (dumpHostnames) {
+        context.write(key, emptyText);
+        return;
+      }
+      
+      // Write anyway
+      context.write(key, new Text(datum.toString()));
+    }
+  }
+
+  // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration
+  // host_a.example.org,host_a.example.org ==> example.org
+//   static class ReadHostDbReducer extends Reduce<Text, Text, Text, Text> {
+//     public void setup(Context context) { }
+//
+//     public void reduce(Text domain, Iterable<Text> hosts, Context context) throws IOException, InterruptedException {
+//
+//     }
+//   }
+
+  private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("ReadHostDb: starting at " + sdf.format(start));
+
+    Configuration conf = getConf();
+    conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages);
+    conf.setBoolean(HOSTDB_DUMP_HOSTNAMES, dumpHostnames);
+    if (expr != null) {
+      conf.set(HOSTDB_FILTER_EXPRESSION, expr);
+    }
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    conf.set("mapred.textoutputformat.separator", "\t");
+    
+    Job job = new Job(conf, "ReadHostDb");
+    job.setJarByClass(ReadHostDb.class);
+
+    FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
+    FileOutputFormat.setOutputPath(job, output);
+
+    job.setJarByClass(ReadHostDb.class);
+    job.setMapperClass(ReadHostDbMapper.class);
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    job.setNumReduceTasks(0);
+
+    try {
+      job.waitForCompletion(true);
+    } catch (Exception e) {
+      throw e;
+    }
+
+    long end = System.currentTimeMillis();
+    LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: ReadHostDb <hostdb> <output> [-dumpHomepages | -dumpHostnames | -expr <expr.>]");
+      return -1;
+    }
+
+    boolean dumpHomepages = false;
+    boolean dumpHostnames = false;
+    String expr = null;
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-dumpHomepages")) {
+        LOG.info("ReadHostDb: dumping homepage URL's");
+        dumpHomepages = true;
+      }
+      if (args[i].equals("-dumpHostnames")) {
+        LOG.info("ReadHostDb: dumping hostnames");
+        dumpHostnames = true;
+      }
+      if (args[i].equals("-expr")) {
+        expr = args[i + 1];
+        LOG.info("ReadHostDb: evaluating expression: " + expr);
+        i++;
+      }
+    }
+
+    try {
+      readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr);
+      return 0;
+    } catch (Exception e) {
+      LOG.error("ReadHostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/ResolverThread.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple runnable that performs DNS lookup for a single host.
+ */
+public class ResolverThread implements Runnable {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ResolverThread.class);
+
+  protected String host = null;
+  protected HostDatum datum = null;
+  protected Text hostText = new Text();
+  protected OutputCollector<Text,HostDatum> output;
+  protected Reporter reporter;
+  protected int purgeFailedHostsThreshold;
+
+  /**
+   * Constructor.
+   */
+  public ResolverThread(String host, HostDatum datum,
+    OutputCollector<Text,HostDatum> output, Reporter reporter, int purgeFailedHostsThreshold) {
+
+    hostText.set(host);
+    this.host = host;
+    this.datum = datum;
+    this.output = output;
+    this.reporter = reporter;
+    this.purgeFailedHostsThreshold = purgeFailedHostsThreshold;
+  }
+
+  /**
+   *
+   */
+  public void run() {
+    // Resolve the host and act appropriatly
+    try {
+      // Throws an exception if host is not found
+      InetAddress inetAddr = InetAddress.getByName(host);
+
+      if (datum.isEmpty()) {
+        reporter.incrCounter("UpdateHostDb", "new_known_host" ,1);
+        datum.setLastCheck();
+        LOG.info(host + ": new_known_host " + datum);
+      } else if (datum.getDnsFailures() > 0) {
+        reporter.incrCounter("UpdateHostDb", "rediscovered_host" ,1);
+        datum.setLastCheck();
+        datum.setDnsFailures(0);
+        LOG.info(host + ": rediscovered_host " + datum);
+      } else {
+        reporter.incrCounter("UpdateHostDb", "existing_known_host", 1);
+        datum.setLastCheck();
+        LOG.info(host + ": existing_known_host " + datum);
+      }
+
+      // Write the host datum
+      output.collect(hostText, datum);
+    } catch (UnknownHostException e) {
+      try {
+        // If the counter is empty we'll initialize with date = today and 1 failure
+        if (datum.isEmpty()) {
+          datum.setLastCheck();
+          datum.setDnsFailures(1);
+          output.collect(hostText, datum);
+          reporter.incrCounter("UpdateHostDb", "new_unknown_host", 1);
+          LOG.info(host + ": new_unknown_host " + datum);
+        } else {
+          datum.setLastCheck();
+          datum.incDnsFailures();
+
+          // Check if this host should be forgotten
+          if (purgeFailedHostsThreshold == -1 ||
+            purgeFailedHostsThreshold < datum.getDnsFailures()) {
+
+            output.collect(hostText, datum);
+            reporter.incrCounter("UpdateHostDb", "existing_unknown_host" ,1);
+            LOG.info(host + ": existing_unknown_host " + datum);
+          } else {
+            reporter.incrCounter("UpdateHostDb", "purged_unknown_host" ,1);
+            LOG.info(host + ": purged_unknown_host " + datum);
+          }
+        }
+
+        reporter.incrCounter("UpdateHostDb",
+          Integer.toString(datum.numFailures()) + "_times_failed", 1);
+      } catch (Exception ioe) {
+        LOG.warn(StringUtils.stringifyException(ioe));
+      }
+    } catch (Exception e) {
+      LOG.warn(StringUtils.stringifyException(e));
+    }
+    
+    reporter.incrCounter("UpdateHostDb", "checked_hosts", 1);
+  }
+}
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDb.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values
+ * by host and checks DNS entries for hosts.
+ */
+public class UpdateHostDb extends Configured implements Tool {
+
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDb.class);
+  public static final String LOCK_NAME = ".locked";
+
+  public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold";
+  public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads";
+  public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval";
+  public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed";
+  public static final String HOSTDB_CHECK_NEW = "hostdb.check.new";
+  public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known";
+  public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check";
+  public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter";
+  public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize";
+  public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields";
+  public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields";
+  public static final String HOSTDB_PERCENTILES = "hostdb.percentiles";
+  
+  private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts,
+    boolean checkFailed, boolean checkNew, boolean checkKnown,
+    boolean force, boolean filter, boolean normalize) throws Exception {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("UpdateHostDb: starting at " + sdf.format(start));
+
+    JobConf job = new NutchJob(getConf());
+    boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+    job.setJarByClass(UpdateHostDb.class);
+    job.setJobName("UpdateHostDb");
+
+    // Check whether the urlfilter-domainblacklist plugin is loaded
+    if (filter && new String("urlfilter-domainblacklist").matches(job.get("plugin.includes"))) {
+      throw new Exception("domainblacklist-urlfilter must not be enabled");
+    }
+
+    // Check whether the urlnormalizer-host plugin is loaded
+    if (normalize && new String("urlnormalizer-host").matches(job.get("plugin.includes"))) {
+      throw new Exception("urlnormalizer-host must not be enabled");
+    }
+
+    FileSystem fs = FileSystem.get(job);
+    Path old = new Path(hostDb, "old");
+    Path current = new Path(hostDb, "current");
+    Path tempHostDb = new Path(hostDb, "hostdb-"
+      + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+    // lock an existing hostdb to prevent multiple simultaneous updates
+    Path lock = new Path(hostDb, LOCK_NAME);
+    if (!fs.exists(current)) {
+      fs.mkdirs(current);
+    }
+    LockUtil.createLockFile(fs, lock, false);
+
+    MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+
+    if (topHosts != null) {
+      MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class);
+    }
+    if (crawlDb != null) {
+      // Tell the job we read from CrawlDB
+      job.setBoolean("hostdb.reading.crawldb", true);
+      MultipleInputs.addInputPath(job, new Path(crawlDb,
+        CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class);
+    }
+
+    FileOutputFormat.setOutputPath(job, tempHostDb);
+
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(HostDatum.class);
+    job.setMapperClass(UpdateHostDbMapper.class);
+    job.setReducerClass(UpdateHostDbReducer.class);
+
+    job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    job.setSpeculativeExecution(false);
+    job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed);
+    job.setBoolean(HOSTDB_CHECK_NEW, checkNew);
+    job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown);
+    job.setBoolean(HOSTDB_FORCE_CHECK, force);
+    job.setBoolean(HOSTDB_URL_FILTERING, filter);
+    job.setBoolean(HOSTDB_URL_NORMALIZING, normalize);
+    job.setClassLoader(Thread.currentThread().getContextClassLoader());
+    
+    try {
+      JobClient.runJob(job);
+
+      FSUtils.replace(fs, old, current, true);
+      FSUtils.replace(fs, current, tempHostDb, true);
+
+      if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
+    } catch (Exception e) {
+      if (fs.exists(tempHostDb)) {
+        fs.delete(tempHostDb, true);
+      }
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+
+    LockUtil.removeLockFile(fs, lock);
+    long end = System.currentTimeMillis();
+    LOG.info("UpdateHostDb: finished at " + sdf.format(end) +
+      ", elapsed: " + TimingUtil.elapsedTime(start, end));
+  }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new UpdateHostDb(), args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " +
+        "[-tophosts <tophosts>] [-crawldb <crawldb>] [-checkAll] [-checkFailed]" +
+        " [-checkNew] [-checkKnown] [-force] [-filter] [-normalize]");
+      return -1;
+    }
+
+    Path hostDb = null;
+    Path crawlDb = null;
+    Path topHosts = null;
+
+    boolean checkFailed = false;
+    boolean checkNew = false;
+    boolean checkKnown = false;
+    boolean force = false;
+
+    boolean filter = false;
+    boolean normalize = false;
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-hostdb")) {
+        hostDb = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: hostdb: " + hostDb);
+        i++;
+      }
+      if (args[i].equals("-crawldb")) {
+        crawlDb = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: crawldb: " + crawlDb);
+        i++;
+      }
+      if (args[i].equals("-tophosts")) {
+        topHosts = new Path(args[i + 1]);
+        LOG.info("UpdateHostDb: tophosts: " + topHosts);
+        i++;
+      }
+
+      if (args[i].equals("-checkFailed")) {
+        LOG.info("UpdateHostDb: checking failed hosts");
+        checkFailed = true;
+      }
+      if (args[i].equals("-checkNew")) {
+        LOG.info("UpdateHostDb: checking new hosts");
+        checkNew = true;
+      }
+      if (args[i].equals("-checkKnown")) {
+        LOG.info("UpdateHostDb: checking known hosts");
+        checkKnown = true;
+      }
+      if (args[i].equals("-checkAll")) {
+        LOG.info("UpdateHostDb: checking all hosts");
+        checkFailed = true;
+        checkNew = true;
+        checkKnown = true;
+      }
+      if (args[i].equals("-force")) {
+        LOG.info("UpdateHostDb: forced check");
+        force = true;
+      }
+      if (args[i].equals("-filter")) {
+        LOG.info("UpdateHostDb: filtering enabled");
+        filter = true;
+      }
+      if (args[i].equals("-normalize")) {
+        LOG.info("UpdateHostDb: normalizing enabled");
+        normalize = true;
+      }
+    }
+
+    if (hostDb == null) {
+      System.err.println("hostDb is mandatory");
+      return -1;
+    }
+
+    try {
+      updateHostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew,
+        checkKnown, force, filter, normalize);
+
+      return 0;
+    } catch (Exception e) {
+      LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+}
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.URLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read
+ * host score info from a plain text key/value file generated by the
+ * Webgraph's NodeDumper tool.
+ */
+public class UpdateHostDbMapper
+  implements Mapper<Text, Writable, Text, NutchWritable> {
+  
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbMapper.class);
+  protected Text host = new Text();
+  protected HostDatum hostDatum = null;
+  protected CrawlDatum crawlDatum = null;
+  protected String reprUrl = null;
+  protected String buffer = null;
+  protected String[] args = null;
+  protected boolean filter = false;
+  protected boolean normalize = false;
+  protected boolean readingCrawlDb = false;
+  protected URLFilters filters = null;
+  protected URLNormalizers normalizers = null;
+
+  public void close() {}
+
+  /**
+   * @param JobConf
+   * @return void
+   */
+  public void configure(JobConf job) {
+    readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false);
+    filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false);
+    normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false);
+
+    if (filter)
+      filters = new URLFilters(job);
+    if (normalize)
+      normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT);
+  }
+
+  /**
+   * Filters and or normalizes the input URL
+   *
+   * @param String
+   * @return String
+   */
+  protected String filterNormalize(String url) {
+    // We actually receive a hostname here so let's make a URL
+    // TODO: we force shop.fcgroningen to be https, how do we know that here?
+    // http://issues.openindex.io/browse/SPIDER-40
+    url = "http://" + url + "/";
+
+    try {
+      if (normalize)
+        url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+      if (filter)
+        url = filters.filter(url);
+      if (url == null)
+        return null;
+    } catch (Exception e) {
+      return null;
+    }
+
+    // Turn back to host
+    return URLUtil.getHost(url);
+  }
+
+  /**
+    * Mapper ingesting records from the HostDB, CrawlDB and plaintext host
+    * scores file. Statistics and scores are passed on.
+    *
+    * @param Text key
+    * @param Writable value
+    * @param OutputCollector<Text,NutchWritable> output
+    * @param Reporter reporter
+    * @return void
+    */
+  public void map(Text key, Writable value,
+    OutputCollector<Text,NutchWritable> output, Reporter reporter)
+    throws IOException {
+
+    // Get the key!
+    String keyStr = key.toString();
+
+    // Check if we process records from the CrawlDB
+    if (key instanceof Text && value instanceof CrawlDatum) {
+      // Get the normalized and filtered host of this URL
+      buffer = filterNormalize(URLUtil.getHost(keyStr));
+
+      // Filtered out?
+      if (buffer == null) {
+        reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+        LOG.info("UpdateHostDb: " + URLUtil.getHost(keyStr) + " crawldatum has been filtered");
+        return;
+      }
+
+      // Set the host of this URL
+      host.set(buffer);
+      crawlDatum = (CrawlDatum)value;
+      hostDatum = new HostDatum();
+
+      /**
+        * TODO: fix multi redirects: host_a => host_b/page => host_c/page/whatever
+        * http://www.ferienwohnung-armbruster.de/
+        * http://www.ferienwohnung-armbruster.de/website/
+        * http://www.ferienwohnung-armbruster.de/website/willkommen.php
+        *
+        * We cannot reresolve redirects for host objects as CrawlDatum metadata is
+        * not available. We also cannot reliably use the reducer in all cases
+        * since redirects may be across hosts or even domains. The example
+        * above has redirects that will end up in the same reducer. During that
+        * phase, however, we do not know which URL redirects to the next URL.
+        */
+      // Do not resolve homepages when the root URL is unfetched
+      if (crawlDatum.getStatus() != CrawlDatum.STATUS_DB_UNFETCHED) {
+        // Get the protocol
+        String protocol = URLUtil.getProtocol(keyStr);
+        
+        // Get the proposed homepage URL
+        String homepage = protocol + "://" + buffer + "/";
+
+        // Check if the current key is equals the host
+        if (keyStr.equals(homepage)) {
+          // Check if this is a redirect to the real home page
+          if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
+            crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+            // Obtain the repr url for this redirect via protocolstatus from the metadata
+            ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData().
+              get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+
+            // Get the protocol status' arguments
+            args = z.getArgs();
+
+            // ..and the possible redirect URL
+            reprUrl = args[0];
+
+            // Am i a redirect?
+            if (reprUrl != null) {
+              LOG.info("UpdateHostDb: homepage: " + keyStr + " redirects to: " + args[0]);
+              output.collect(host, new NutchWritable(hostDatum));
+              hostDatum.setHomepageUrl(reprUrl);
+            } else {
+              LOG.info("UpdateHostDb: homepage: " + keyStr + 
+                " redirects to: " + args[0] + " but has been filtered out");
+            }
+          } else {
+            hostDatum.setHomepageUrl(homepage);
+            output.collect(host, new NutchWritable(hostDatum));
+            LOG.info("UpdateHostDb: homepage: " + homepage);
+          }
+        }
+      }
+
+      // Always emit crawl datum
+      output.collect(host, new NutchWritable(crawlDatum));
+    }
+
+    // Check if we got a record from the hostdb
+    if (key instanceof Text && value instanceof HostDatum) {
+      buffer = filterNormalize(keyStr);
+
+      // Filtered out?
+      if (buffer == null) {
+        reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+        LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered");
+        return;
+      }
+
+      // Get a HostDatum
+      hostDatum = (HostDatum)value;
+      key.set(buffer);
+
+      // If we're also reading CrawlDb entries, reset db_* statistics because
+      // we're aggregating them from CrawlDB anyway
+      if (readingCrawlDb) {
+        hostDatum.resetStatistics();
+      }
+
+      output.collect(key, new NutchWritable(hostDatum));
+    }
+
+    // Check if we got a record with host scores
+    if (key instanceof Text && value instanceof Text) {
+      buffer = filterNormalize(keyStr);
+
+      // Filtered out?
+      if (buffer == null) {
+        reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+        LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered");
+        return;
+      }
+
+      key.set(buffer);
+
+      output.collect(key,
+        new NutchWritable(new FloatWritable(Float.parseFloat(value.toString()))));
+    }
+  }
+}
\ No newline at end of file

Added: nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java?rev=1725952&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java Thu Jan 21 13:58:56 2016
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+
+import com.tdunning.math.stats.TDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class UpdateHostDbReducer
+  implements Reducer<Text, NutchWritable, Text, HostDatum> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbReducer.class);
+  protected ResolverThread resolverThread = null;
+  protected Integer numResolverThreads = 10;
+  protected static Integer purgeFailedHostsThreshold = -1;
+  protected static Integer recheckInterval = 86400000;
+  protected static boolean checkFailed = false;
+  protected static boolean checkNew = false;
+  protected static boolean checkKnown = false;
+  protected static boolean force = false;
+  protected static long now = new Date().getTime();
+  protected static String[] numericFields;
+  protected static String[] stringFields;
+  protected static int[] percentiles;
+  protected static Text[] numericFieldWritables;
+  protected static Text[] stringFieldWritables;
+  
+  protected BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+  protected ThreadPoolExecutor executor = null;
+
+  /**
+    * Configures the thread pool and prestarts all resolver threads.
+    *
+    * @param JobConf
+    */
+  public void configure(JobConf job) {
+    purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1);
+    numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10);
+    recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000;
+    checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false);
+    checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false);
+    checkKnown = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_KNOWN, false);
+    force = job.getBoolean(UpdateHostDb.HOSTDB_FORCE_CHECK, false);
+    numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS);
+    stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS);
+    percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES);
+    
+    // What fields do we need to collect metadata from
+    if (numericFields != null) {
+      numericFieldWritables = new Text[numericFields.length];
+      for (int i = 0; i < numericFields.length; i++) {
+        numericFieldWritables[i] = new Text(numericFields[i]);
+      }
+    }
+    
+    if (stringFields != null) {
+      stringFieldWritables = new Text[stringFields.length];
+      for (int i = 0; i < stringFields.length; i++) {
+        stringFieldWritables[i] = new Text(stringFields[i]);
+      }
+    }
+
+    // Initialize the thread pool with our queue
+    executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads,
+      5, TimeUnit.SECONDS, queue);
+
+    // Run all threads in the pool
+    executor.prestartAllCoreThreads();
+  }
+
+  /**
+    *
+    */
+  public void reduce(Text key, Iterator<NutchWritable> values,
+    OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException {
+
+    Map<String,Map<String,Integer>> stringCounts = new HashMap<String,Map<String, Integer>>();
+    Map<String,Float> maximums = new HashMap<String,Float>();
+    Map<String,Float> sums = new HashMap<String,Float>(); // used to calc averages
+    Map<String,Integer> counts = new HashMap<String,Integer>(); // used to calc averages
+    Map<String,Float> minimums = new HashMap<String,Float>();
+    Map<String,TDigest> tdigests = new HashMap<String,TDigest>();
+    
+    HostDatum hostDatum = new HostDatum();
+    float score = 0;
+    
+    if (stringFields != null) {
+      for (int i = 0; i < stringFields.length; i++) {
+        stringCounts.put(stringFields[i], new HashMap<String,Integer>());
+      }
+    }
+    
+    // Loop through all values until we find a non-empty HostDatum or use
+    // an empty if this is a new host for the host db
+    while (values.hasNext()) {
+      Writable value = values.next().get();
+      
+      // Count crawl datum status's and collect metadata from fields
+      if (value instanceof CrawlDatum) {
+        CrawlDatum buffer = (CrawlDatum)value;
+        
+        // Set the correct status field
+        switch (buffer.getStatus()) {
+          case CrawlDatum.STATUS_DB_UNFETCHED:
+            hostDatum.setUnfetched(hostDatum.getUnfetched() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_FETCHED:
+            hostDatum.setFetched(hostDatum.getFetched() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_GONE:
+            hostDatum.setGone(hostDatum.getGone() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_REDIR_TEMP:
+            hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_REDIR_PERM:
+            hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1);
+            break;
+
+          case CrawlDatum.STATUS_DB_NOTMODIFIED:
+            hostDatum.setNotModified(hostDatum.getNotModified() + 1);
+            break;
+        }
+        
+        // Record connection failures
+        if (buffer.getRetriesSinceFetch() != 0) {
+          hostDatum.incConnectionFailures();
+        }
+        
+        // Only gather metadata statistics for proper fetched pages
+        if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {            
+          // Deal with the string fields
+          if (stringFields != null) {
+            for (int i = 0; i < stringFields.length; i++) {
+              // Does this field exist?
+              if (buffer.getMetaData().get(stringFieldWritables[i]) != null) {
+                // Get it!
+                String metadataValue = null;
+                try {
+                  metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString();
+                } catch (Exception e) {
+                  LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value");
+                }
+              
+                // Does the value exist?
+                if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
+                  // Yes, increment it
+                  stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1);
+                } else {
+                  // Create it!
+                  stringCounts.get(stringFields[i]).put(metadataValue, 1);
+                }
+              }
+            }
+          }
+          
+          // Deal with the numeric fields
+          if (numericFields != null) {
+            for (int i = 0; i < numericFields.length; i++) {
+              // Does this field exist?
+              if (buffer.getMetaData().get(numericFieldWritables[i]) != null) {
+                try {
+                  // Get it!
+                  Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString());
+                  
+                  // Does the median value exist?
+                  if (tdigests.containsKey(numericFields[i])) {
+                    tdigests.get(numericFields[i]).add(metadataValue);
+                  } else {
+                    // Create it!
+                    TDigest tdigest = TDigest.createDigest(100);
+                    tdigest.add((double)metadataValue);
+                    tdigests.put(numericFields[i], tdigest);
+                  }
+                
+                  // Does the minimum value exist?
+                  if (minimums.containsKey(numericFields[i])) {
+                    // Write if this is lower than existing value
+                    if (metadataValue < minimums.get(numericFields[i])) {
+                      minimums.put(numericFields[i], metadataValue);
+                    }
+                  } else {
+                    // Create it!
+                    minimums.put(numericFields[i], metadataValue);
+                  }
+                  
+                  // Does the maximum value exist?
+                  if (maximums.containsKey(numericFields[i])) {
+                    // Write if this is lower than existing value
+                    if (metadataValue > maximums.get(numericFields[i])) {
+                      maximums.put(numericFields[i], metadataValue);
+                    }
+                  } else {
+                    // Create it!
+                    maximums.put(numericFields[i], metadataValue);
+                  }
+                  
+                  // Sum it up!
+                  if (sums.containsKey(numericFields[i])) {
+                    // Increment
+                    sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue);
+                    counts.put(numericFields[i], counts.get(numericFields[i]) + 1);
+                  } else {
+                    // Create it!
+                    sums.put(numericFields[i], metadataValue);
+                    counts.put(numericFields[i], 1);
+                  }
+                } catch (Exception e) {
+                  LOG.error(e.getMessage() + " when processing values for " + key.toString());
+                }
+              }
+            }
+          }
+        }
+      }
+      
+      // 
+      if (value instanceof HostDatum) {
+        HostDatum buffer = (HostDatum)value;
+
+        // Check homepage URL
+        if (buffer.hasHomepageUrl()) {
+          hostDatum.setHomepageUrl(buffer.getHomepageUrl());
+        }
+
+        // Check lastCheck timestamp
+        if (!buffer.isEmpty()) {
+          hostDatum.setLastCheck(buffer.getLastCheck());
+        }
+
+        // Check and set DNS failures
+        if (buffer.getDnsFailures() > 0) {
+          hostDatum.setDnsFailures(buffer.getDnsFailures());
+        }
+
+        // Check and set connection failures
+        if (buffer.getConnectionFailures() > 0) {
+          hostDatum.setConnectionFailures(buffer.getConnectionFailures());
+        }
+        
+        // Check metadata
+        if (!buffer.getMetaData().isEmpty()) {
+          hostDatum.setMetaData(buffer.getMetaData());
+        }
+
+        // Check and set score (score from Web Graph has precedence)
+        if (buffer.getScore() > 0) {
+          hostDatum.setScore(buffer.getScore());
+        }
+      }
+
+      // Check for the score
+      if (value instanceof FloatWritable) {
+        FloatWritable buffer = (FloatWritable)value;
+        score = buffer.get();
+      }
+    }
+
+    // Check if score was set from Web Graph
+    if (score > 0) {
+      hostDatum.setScore(score);
+    }
+    
+    // Set metadata
+    for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) {
+      for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) {
+        hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue()));
+      }
+    }
+    for (Map.Entry<String, Float> entry : maximums.entrySet()) {
+      hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue()));
+    }
+    for (Map.Entry<String, Float> entry : sums.entrySet()) {
+      hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey())));
+    }
+    for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) {
+      // Emit all percentiles
+      for (int i = 0; i < percentiles.length; i++) {
+        hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
+      }
+    }      
+    for (Map.Entry<String, Float> entry : minimums.entrySet()) {
+      hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue()));
+    }
+    
+    reporter.incrCounter("UpdateHostDb", "total_hosts", 1);
+
+    // See if this record is to be checked
+    if (shouldCheck(hostDatum)) {
+      // Make an entry
+      resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold);
+
+      // Add the entry to the queue (blocking)
+      try {
+        queue.put(resolverThread);
+      } catch (InterruptedException e) {
+        LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+      }
+
+      // Do not progress, the datum will be written in the resolver thread
+      return;
+    } else {
+      reporter.incrCounter("UpdateHostDb", "skipped_not_eligible", 1);
+      LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible");
+    }
+
+    // Write the host datum if it wasn't written by the resolver thread
+    output.collect(key, hostDatum);
+  }
+
+  /**
+    * Determines whether a record should be checked.
+    *
+    * @param HostDatum
+    * @return boolean
+    */
+  protected boolean shouldCheck(HostDatum datum) {
+    // Whether a new record is to be checked
+    if (checkNew && datum.isEmpty()) {
+      return true;
+    }
+
+    // Whether existing known hosts should be rechecked
+    if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) {
+      return isEligibleForCheck(datum);
+    }
+
+    // Whether failed records are forced to be rechecked
+    if (checkFailed && datum.getDnsFailures() > 0) {
+      return isEligibleForCheck(datum);
+    }
+
+    // It seems this record is not to be checked
+    return false;
+  }
+
+  /**
+    * Determines whether a record is eligible for recheck.
+    *
+    * @param HostDatum
+    * @return boolean
+    */
+  protected boolean isEligibleForCheck(HostDatum datum) {
+    // Whether an existing host, known or unknown, if forced to be rechecked
+    if (force || datum.getLastCheck().getTime() +
+      (recheckInterval * datum.getDnsFailures() + 1) > now) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+    * Shut down all running threads and wait for completion.
+    */
+  public void close() {
+    LOG.info("UpdateHostDb: feeder finished, waiting for shutdown");
+
+    // If we're here all keys have been fed and we can issue a shut down
+    executor.shutdown();
+
+    boolean finished = false;
+
+    // Wait until all resolvers have finished
+    while (!finished) {
+      try {
+        // Wait for the executor to shut down completely
+        if (!executor.isTerminated()) {
+          LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize()));
+          Thread.sleep(1000);
+        } else {
+          // All is well, get out
+          finished = true;
+        }
+      } catch (InterruptedException e) {
+        // Huh?
+        LOG.warn(StringUtils.stringifyException(e));
+      }
+    }
+  }
+}
\ No newline at end of file