You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2017/07/26 11:20:57 UTC

[nutch] branch master updated: NUTCH-2368 Variable generate.max.count and fetcher.server.delay

This is an automated email from the ASF dual-hosted git repository.

markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f7ad9  NUTCH-2368 Variable generate.max.count and fetcher.server.delay
44f7ad9 is described below

commit 44f7ad973f2017bacde2bf5277f846179eafc6dd
Author: Markus Jelsma <ma...@apache.org>
AuthorDate: Wed Jul 26 13:20:26 2017 +0200

    NUTCH-2368 Variable generate.max.count and fetcher.server.delay
---
 conf/nutch-default.xml                             |  24 ++++
 src/java/org/apache/nutch/crawl/Generator.java     | 159 +++++++++++++++++++--
 .../org/apache/nutch/fetcher/FetchItemQueue.java   |  28 +++-
 3 files changed, 198 insertions(+), 13 deletions(-)

diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml
index 9752be2..c406907 100644
--- a/conf/nutch-default.xml
+++ b/conf/nutch-default.xml
@@ -774,6 +774,30 @@
   generate.min.interval. A value of -1 disables this check.</description>
 </property>
 
+<property>
+  <name>generate.hostdb</name>
+  <value></value>
+  <description>Path to HostDB, required for the generate.max.count.expr
+  and generate.fetch.delay.expr properties.
+  See https://issues.apache.org/jira/browse/NUTCH-2368</description>
+</property>
+
+<property>
+  <name>generate.fetch.delay.expr</name>
+  <value></value>
+  <description>Controls variable fetcher.server.delay via a Jexl expression and
+  HostDB information. It allows you to alter fetch delay based on HostDB data.
+  See https://issues.apache.org/jira/browse/NUTCH-2368</description>
+</property>
+
+<property>
+  <name>generate.max.count.expr</name>
+  <value></value>
+  <description>Controls variable generate.max.count via a Jexl expression and
+  HostDB information. It allows you to alter maxCount based on HostDB data.
+  See https://issues.apache.org/jira/browse/NUTCH-2368</description>
+</property>
+
 <!-- urlpartitioner properties -->
 
 <property>
diff --git a/src/java/org/apache/nutch/crawl/Generator.java b/src/java/org/apache/nutch/crawl/Generator.java
index 677cd2f..21607ec 100644
--- a/src/java/org/apache/nutch/crawl/Generator.java
+++ b/src/java/org/apache/nutch/crawl/Generator.java
@@ -27,6 +27,8 @@ import java.text.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.MapContext;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
@@ -35,6 +37,7 @@ import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.nutch.hostdb.HostDatum;
 import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLFilterException;
 import org.apache.nutch.net.URLFilters;
@@ -49,7 +52,6 @@ import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TimingUtil;
 import org.apache.nutch.util.URLUtil;
 
-
 /**
  * Generates a subset of a crawl db to fetch. This version allows to generate
  * fetchlists for several segments in one go. Unlike in the initial version
@@ -78,6 +80,9 @@ public class Generator extends NutchTool implements Tool {
   public static final String GENERATOR_DELAY = "crawl.gen.delay";
   public static final String GENERATOR_MAX_NUM_SEGMENTS = "generate.max.num.segments";
   public static final String GENERATOR_EXPR = "generate.expr";
+  public static final String GENERATOR_HOSTDB = "generate.hostdb";
+  public static final String GENERATOR_MAX_COUNT_EXPR = "generate.max.count.expr";
+  public static final String GENERATOR_FETCH_DELAY_EXPR = "generate.fetch.delay.expr";
 
   public static class SelectorEntry implements Writable {
     public Text url;
@@ -137,8 +142,13 @@ public class Generator extends NutchTool implements Tool {
     private int maxNumSegments = 1;
     private Expression expr = null;
     private int currentsegmentnum = 1;
-
+    private SequenceFile.Reader[] hostdbReaders = null;
+    private Expression maxCountExpr = null;
+    private Expression fetchDelayExpr = null;
+    private JobConf conf = null;
+    
     public void configure(JobConf job) {
+      this.conf = job;
       curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
       limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE)
           / job.getNumReduceTasks();
@@ -167,9 +177,29 @@ public class Generator extends NutchTool implements Tool {
       expr = JexlUtil.parseExpression(job.get(GENERATOR_EXPR, null));
       maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
       segCounts = new int[maxNumSegments];
+      
+      if (job.get(GENERATOR_HOSTDB) != null) {
+        try {
+          Path path = new Path(job.get(GENERATOR_HOSTDB), "current");
+          hostdbReaders = SequenceFileOutputFormat.getReaders(job, path);
+          maxCountExpr = JexlUtil.parseExpression(job.get(GENERATOR_MAX_COUNT_EXPR, null));
+          fetchDelayExpr = JexlUtil.parseExpression(job.get(GENERATOR_FETCH_DELAY_EXPR, null));
+        } catch (IOException e) {
+          LOG.error("Error reading HostDB because {}", e.getMessage());
+        }
+      }
     }
 
     public void close() {
+      if (hostdbReaders != null) {
+        try {
+          for (int i = 0; i < hostdbReaders.length; i++) {
+            hostdbReaders[i].close();
+          }
+        } catch (IOException e) {
+          LOG.error("Error closing HostDB because {}", e.getMessage());
+        }
+      }
     }
 
     /** Select and invert subset due for fetch. */
@@ -252,13 +282,106 @@ public class Generator extends NutchTool implements Tool {
       return partitioner.getPartition(((SelectorEntry) value).url, key,
           numReduceTasks);
     }
+    
+    private HostDatum getHostDatum(String host) throws Exception {
+      Text key = new Text();
+      HostDatum value = new HostDatum();
+      
+      for (int i = 0; i < hostdbReaders.length; i++) {
+        while (hostdbReaders[i].next(key, value)) {
+          if (host.equals(key.toString())) {
+            return value;
+          }
+        }
+      }
+      
+      return null;
+    }
+    
+    private JexlContext createContext(HostDatum datum) {
+      JexlContext context = new MapContext();
+      context.set("dnsFailures", datum.getDnsFailures());
+      context.set("connectionFailures", datum.getConnectionFailures());
+      context.set("unfetched", datum.getUnfetched());
+      context.set("fetched", datum.getFetched());
+      context.set("notModified", datum.getNotModified());
+      context.set("redirTemp", datum.getRedirTemp());
+      context.set("redirPerm", datum.getRedirPerm());
+      context.set("gone", datum.getGone());
+      context.set("conf", conf);
+      
+      // Set metadata variables
+      for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
+        Object value = entry.getValue();
+        
+        if (value instanceof FloatWritable) {
+          FloatWritable fvalue = (FloatWritable)value;
+          Text tkey = (Text)entry.getKey();
+          context.set(tkey.toString(), fvalue.get());
+        }
+        
+        if (value instanceof IntWritable) {
+          IntWritable ivalue = (IntWritable)value;
+          Text tkey = (Text)entry.getKey();
+          context.set(tkey.toString(), ivalue.get());
+        }
+        
+        if (value instanceof Text) {
+          Text tvalue = (Text)value;
+          Text tkey = (Text)entry.getKey();     
+          context.set(tkey.toString().replace("-", "_"), tvalue.toString());
+        }
+      }
+      
+      return context;
+    }
 
     /** Collect until limit is reached. */
     public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
         OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
         throws IOException {
-
+        
+      String hostname = null;
+      HostDatum host = null;
+      LongWritable variableFetchDelayWritable = null; // in millis
+      Text variableFetchDelayKey = new Text("_variableFetchDelay_");
+      int maxCount = this.maxCount;
       while (values.hasNext()) {
+        SelectorEntry entry = values.next();
+        Text url = entry.url;
+        String urlString = url.toString();
+        URL u = null;
+        
+        // Do this only once per queue
+        if (host == null) {
+          try {
+            hostname = URLUtil.getHost(urlString);
+            host = getHostDatum(hostname);
+          } catch (Exception e) {}
+          
+          // Got it?
+          if (host == null) {
+            // Didn't work, prevent future lookups
+            host = new HostDatum();
+          } else {
+            if (maxCountExpr != null) {
+              long variableMaxCount = Math.round((double)maxCountExpr.evaluate(createContext(host)));
+              LOG.info("Generator: variable maxCount: {} for {}", variableMaxCount, hostname);
+              maxCount = (int)variableMaxCount;
+            }
+            
+            if (fetchDelayExpr != null) {
+              long variableFetchDelay = Math.round((double)fetchDelayExpr.evaluate(createContext(host)));
+              LOG.info("Generator: variable fetchDelay: {} ms for {}", variableFetchDelay, hostname);
+              variableFetchDelayWritable = new LongWritable(variableFetchDelay);              
+            }
+          }
+        }
+        
+        // Got a non-zero variable fetch delay? Add it to the datum's metadata
+        if (variableFetchDelayWritable != null) {
+          entry.datum.getMetaData().put(variableFetchDelayKey, variableFetchDelayWritable);
+        }
 
         if (count == limit) {
           // do we have any segments left?
@@ -269,11 +392,6 @@ public class Generator extends NutchTool implements Tool {
             break;
         }
 
-        SelectorEntry entry = values.next();
-        Text url = entry.url;
-        String urlString = url.toString();
-        URL u = null;
-
         String hostordomain = null;
 
         try {
@@ -503,6 +621,13 @@ public class Generator extends NutchTool implements Tool {
     return generate(dbDir, segments, numLists, topN, curTime, filter, true,
         force, 1, null);
   }
+  
+  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
+      long curTime, boolean filter, boolean norm, boolean force,
+      int maxNumSegments, String expr) throws IOException {
+    return generate(dbDir, segments, numLists, topN, curTime, filter, true,
+        force, 1, expr, null);
+  }
 
   /**
    * Generate fetchlists in one or more segments. Whether to filter URLs or not
@@ -528,7 +653,7 @@ public class Generator extends NutchTool implements Tool {
    */
   public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
       long curTime, boolean filter, boolean norm, boolean force,
-      int maxNumSegments, String expr) throws IOException {
+      int maxNumSegments, String expr, String hostdb) throws IOException {
 
     Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
         + "/generate-temp-" + java.util.UUID.randomUUID().toString());
@@ -543,10 +668,13 @@ public class Generator extends NutchTool implements Tool {
     LOG.info("Generator: filtering: " + filter);
     LOG.info("Generator: normalizing: " + norm);
     if (topN != Long.MAX_VALUE) {
-      LOG.info("Generator: topN: " + topN);
+      LOG.info("Generator: topN: {}", topN);
+    }
+    if (expr != null) {
+      LOG.info("Generator: expr: {}", expr);
     }
     if (expr != null) {
-      LOG.info("Generator: expr: " + expr);
+      LOG.info("Generator: hostdb: {}", hostdb);
     }
     
     // map to inverted subset due for fetch, sort by score
@@ -572,6 +700,9 @@ public class Generator extends NutchTool implements Tool {
     if (expr != null) {
       job.set(GENERATOR_EXPR, expr);
     }
+    if (hostdb != null) {
+      job.set(GENERATOR_HOSTDB, hostdb);
+    }
     FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
 
@@ -728,6 +859,7 @@ public class Generator extends NutchTool implements Tool {
 
     Path dbDir = new Path(args[0]);
     Path segmentsDir = new Path(args[1]);
+    String hostdb = null;
     long curTime = System.currentTimeMillis();
     long topN = Long.MAX_VALUE;
     int numFetchers = -1;
@@ -744,6 +876,9 @@ public class Generator extends NutchTool implements Tool {
       } else if ("-numFetchers".equals(args[i])) {
         numFetchers = Integer.parseInt(args[i + 1]);
         i++;
+      } else if ("-hostdb".equals(args[i])) {
+        hostdb = args[i + 1];
+        i++;
       } else if ("-adddays".equals(args[i])) {
         long numDays = Integer.parseInt(args[i + 1]);
         curTime += numDays * 1000L * 60 * 60 * 24;
@@ -763,7 +898,7 @@ public class Generator extends NutchTool implements Tool {
 
     try {
       Path[] segs = generate(dbDir, segmentsDir, numFetchers, topN, curTime,
-          filter, norm, force, maxNumSegments, expr);
+          filter, norm, force, maxNumSegments, expr, hostdb);
       if (segs == null)
         return 1;
     } catch (Exception e) {
diff --git a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
index 435996b..b67be74 100644
--- a/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
+++ b/src/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,10 @@ public class FetchItemQueue {
   long minCrawlDelay;
   int maxThreads;
   Configuration conf;
-
+  Text cookie;
+  Text variableFetchDelayKey = new Text("_variableFetchDelay_");
+  boolean variableFetchDelaySet = false;
+  
   public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
       long minCrawlDelay) {
     this.conf = conf;
@@ -85,6 +90,19 @@ public class FetchItemQueue {
   public void addFetchItem(FetchItem it) {
     if (it == null)
       return;
+
+    // Check for variable crawl delay
+    if (it.datum.getMetaData().containsKey(variableFetchDelayKey)) {
+      if (!variableFetchDelaySet) {
+        variableFetchDelaySet = true;
+        crawlDelay = ((LongWritable)(it.datum.getMetaData().get(variableFetchDelayKey))).get();
+        minCrawlDelay = ((LongWritable)(it.datum.getMetaData().get(variableFetchDelayKey))).get();
+        setEndTime(System.currentTimeMillis() - crawlDelay);
+      }
+      
+      // Remove it!
+      it.datum.getMetaData().remove(variableFetchDelayKey);
+    }
     queue.add(it);
   }
 
@@ -113,6 +131,14 @@ public class FetchItemQueue {
     }
     return it;
   }
+  
+  public void setCookie(Text cookie) {
+    this.cookie = cookie;
+  }
+  
+  public Text getCookie() {
+    return cookie;
+  }
 
   public synchronized void dump() {
     LOG.info("  maxThreads    = " + maxThreads);

-- 
To stop receiving notification emails like this one, please contact
['"commits@nutch.apache.org" <co...@nutch.apache.org>'].