You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2014/05/16 09:59:05 UTC

svn commit: r1595137 - in /nutch/trunk: CHANGES.txt src/java/org/apache/nutch/crawl/Injector.java

Author: jnioche
Date: Fri May 16 07:59:05 2014
New Revision: 1595137

URL: http://svn.apache.org/r1595137
Log:
NUTCH-1772 Injector does not need merging if no pre-existing crawldb

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1595137&r1=1595136&r2=1595137&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri May 16 07:59:05 2014
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Nutch Current Development
 
+* NUTCH-1772 Injector does not need merging if no pre-existing crawldb (jnioche)
+
 * NUTCH-1752 Cache robots.txt rules per protocol:host:port (snagel)
 
 * NUTCH-1613 Timeouts in protocol-httpclient when crawling same host with >2 threads (brian44 via jnioche)

Modified: nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java?rev=1595137&r1=1595136&r2=1595137&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java Fri May 16 07:59:05 2014
@@ -39,28 +39,38 @@ import org.apache.nutch.util.NutchConfig
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.TimingUtil;
 
-/** This class takes a flat file of URLs and adds them to the of pages to be
- * crawled.  Useful for bootstrapping the system. 
- * The URL files contain one URL per line, optionally followed by custom metadata 
- * separated by tabs with the metadata key separated from the corresponding value by '='. <br>
+/**
+ * This class takes a flat file of URLs and adds them to the of pages to be
+ * crawled. Useful for bootstrapping the system. The URL files contain one URL
+ * per line, optionally followed by custom metadata separated by tabs with the
+ * metadata key separated from the corresponding value by '='. <br>
  * Note that some metadata keys are reserved : <br>
  * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
- * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
- * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval for a specific URL that is not changed by AdaptiveFetchSchedule <br>
- * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
+ * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a
+ * specific URL <br>
+ * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval
+ * for a specific URL that is not changed by AdaptiveFetchSchedule <br>
+ * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000
+ * \t userType=open_source
  **/
 public class Injector extends Configured implements Tool {
   public static final Logger LOG = LoggerFactory.getLogger(Injector.class);
-  
+
   /** metadata key reserved for setting a custom score for a specific URL */
   public static String nutchScoreMDName = "nutch.score";
-  /** metadata key reserved for setting a custom fetchInterval for a specific URL */
+  /**
+   * metadata key reserved for setting a custom fetchInterval for a specific URL
+   */
   public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
-  /** metadata key reserved for setting a fixed custom fetchInterval for a specific URL */
+  /**
+   * metadata key reserved for setting a fixed custom fetchInterval for a
+   * specific URL
+   */
   public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed";
 
   /** Normalize and filter injected urls. */
-  public static class InjectMapper implements Mapper<WritableComparable<?>, Text, Text, CrawlDatum> {
+  public static class InjectMapper implements
+      Mapper<WritableComparable<?>, Text, Text, CrawlDatum> {
     private URLNormalizers urlNormalizers;
     private int interval;
     private float scoreInjected;
@@ -76,19 +86,21 @@ public class Injector extends Configured
       filters = new URLFilters(jobConf);
       scfilters = new ScoringFilters(jobConf);
       scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
-      curTime = job.getLong("injector.current.time", System.currentTimeMillis());
+      curTime = job
+          .getLong("injector.current.time", System.currentTimeMillis());
     }
 
-    public void close() {}
+    public void close() {
+    }
 
     public void map(WritableComparable<?> key, Text value,
-                    OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-      throws IOException {
-      String url = value.toString().trim();              // value is line of text
-
-      if (url != null && ( url.length() == 0 || url.startsWith("#") ) ) {
-          /* Ignore line that start with # */
-          return;
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
+      String url = value.toString().trim(); // value is line of text
+
+      if (url != null && (url.length() == 0 || url.startsWith("#"))) {
+        /* Ignore line that start with # */
+        return;
       }
 
       // if tabs : metadata that could be stored
@@ -96,55 +108,60 @@ public class Injector extends Configured
       float customScore = -1f;
       int customInterval = interval;
       int fixedInterval = -1;
-      Map<String,String> metadata = new TreeMap<String,String>();
-      if (url.indexOf("\t")!=-1){
-    	  String[] splits = url.split("\t");
-    	  url = splits[0];
-    	  for (int s=1;s<splits.length;s++){
-    		  // find separation between name and value
-    		  int indexEquals = splits[s].indexOf("=");
-    		  if (indexEquals==-1) {
-    			  // skip anything without a =
-    			  continue;		    
-    		  }
-    		  String metaname = splits[s].substring(0, indexEquals);
-    		  String metavalue = splits[s].substring(indexEquals+1);
-    		  if (metaname.equals(nutchScoreMDName)) {
-    			  try {
-    			  customScore = Float.parseFloat(metavalue);}
-    			  catch (NumberFormatException nfe){}
-    		  }
-                  else if (metaname.equals(nutchFetchIntervalMDName)) {
-                          try {
-                                  customInterval = Integer.parseInt(metavalue);}
-                          catch (NumberFormatException nfe){}
-                  }
-                  else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
-                          try {
-                                  fixedInterval = Integer.parseInt(metavalue);}
-                          catch (NumberFormatException nfe){}
-                  }
-    		  else metadata.put(metaname,metavalue);
-    	  }
+      Map<String, String> metadata = new TreeMap<String, String>();
+      if (url.indexOf("\t") != -1) {
+        String[] splits = url.split("\t");
+        url = splits[0];
+        for (int s = 1; s < splits.length; s++) {
+          // find separation between name and value
+          int indexEquals = splits[s].indexOf("=");
+          if (indexEquals == -1) {
+            // skip anything without a =
+            continue;
+          }
+          String metaname = splits[s].substring(0, indexEquals);
+          String metavalue = splits[s].substring(indexEquals + 1);
+          if (metaname.equals(nutchScoreMDName)) {
+            try {
+              customScore = Float.parseFloat(metavalue);
+            } catch (NumberFormatException nfe) {
+            }
+          } else if (metaname.equals(nutchFetchIntervalMDName)) {
+            try {
+              customInterval = Integer.parseInt(metavalue);
+            } catch (NumberFormatException nfe) {
+            }
+          } else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
+            try {
+              fixedInterval = Integer.parseInt(metavalue);
+            } catch (NumberFormatException nfe) {
+            }
+          } else
+            metadata.put(metaname, metavalue);
+        }
       }
       try {
         url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
-        url = filters.filter(url);             // filter the url
+        url = filters.filter(url); // filter the url
       } catch (Exception e) {
-        if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Skipping " + url + ":" + e);
+        }
         url = null;
       }
       if (url == null) {
         reporter.getCounter("injector", "urls_filtered").increment(1);
-      } else {                                   // if it passes
-        value.set(url);                           // collect it
+      } else { // if it passes
+        value.set(url); // collect it
         CrawlDatum datum = new CrawlDatum();
         datum.setStatus(CrawlDatum.STATUS_INJECTED);
 
         // Is interval custom? Then set as meta data
         if (fixedInterval > -1) {
-          // Set writable using float. Flaot is used by AdaptiveFetchSchedule
-          datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, new FloatWritable(fixedInterval));
+          // Set writable using float. Flaot is used by
+          // AdaptiveFetchSchedule
+          datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
+              new FloatWritable(fixedInterval));
           datum.setFetchInterval(fixedInterval);
         } else {
           datum.setFetchInterval(customInterval);
@@ -153,20 +170,22 @@ public class Injector extends Configured
         datum.setFetchTime(curTime);
         // now add the metadata
         Iterator<String> keysIter = metadata.keySet().iterator();
-        while (keysIter.hasNext()){
-        	String keymd = keysIter.next();
-        	String valuemd = metadata.get(keymd);
-        	datum.getMetaData().put(new Text(keymd), new Text(valuemd));
+        while (keysIter.hasNext()) {
+          String keymd = keysIter.next();
+          String valuemd = metadata.get(keymd);
+          datum.getMetaData().put(new Text(keymd), new Text(valuemd));
         }
-        if (customScore != -1) datum.setScore(customScore);
-        else datum.setScore(scoreInjected);
+        if (customScore != -1)
+          datum.setScore(customScore);
+        else
+          datum.setScore(scoreInjected);
         try {
-        	scfilters.injectedScore(value, datum);
+          scfilters.injectedScore(value, datum);
         } catch (ScoringFilterException e) {
-        	if (LOG.isWarnEnabled()) {
-        		LOG.warn("Cannot filter injected score for url " + url
-        				+ ", using default (" + e.getMessage() + ")");
-        	}
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Cannot filter injected score for url " + url
+                + ", using default (" + e.getMessage() + ")");
+          }
         }
         reporter.getCounter("injector", "urls_injected").increment(1);
         output.collect(value, datum);
@@ -175,7 +194,8 @@ public class Injector extends Configured
   }
 
   /** Combine multiple new entries for a url. */
-  public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+  public static class InjectReducer implements
+      Reducer<Text, CrawlDatum, Text, CrawlDatum> {
     private int interval;
     private float scoreInjected;
     private boolean overwrite = false;
@@ -189,15 +209,16 @@ public class Injector extends Configured
       LOG.info("Injector: overwrite: " + overwrite);
       LOG.info("Injector: update: " + update);
     }
-    
-    public void close() {}
+
+    public void close() {
+    }
 
     private CrawlDatum old = new CrawlDatum();
     private CrawlDatum injected = new CrawlDatum();
-    
+
     public void reduce(Text key, Iterator<CrawlDatum> values,
-                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-      throws IOException {
+        OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+        throws IOException {
       boolean oldSet = false;
       boolean injectedSet = false;
       while (values.hasNext()) {
@@ -210,29 +231,35 @@ public class Injector extends Configured
           old.set(val);
           oldSet = true;
         }
+
       }
 
       CrawlDatum res = null;
-                
+
       // Old default behaviour
       if (injectedSet && !oldSet) {
         res = injected;
       } else {
         res = old;
       }
-      
+      if (injectedSet && oldSet) {
+        reporter.getCounter("injector", "urls_merged").increment(1);
+      }
       /**
        * Whether to overwrite, ignore or update existing records
+       * 
        * @see https://issues.apache.org/jira/browse/NUTCH-1405
        */
       // Injected record already exists and update but not overwrite
       if (injectedSet && oldSet && update && !overwrite) {
         res = old;
         old.putAllMetaData(injected);
-        old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore());
-        old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());
+        old.setScore(injected.getScore() != scoreInjected ? injected.getScore()
+            : old.getScore());
+        old.setFetchInterval(injected.getFetchInterval() != interval ? injected
+            .getFetchInterval() : old.getFetchInterval());
       }
-      
+
       // Injected record already exists and overwrite
       if (injectedSet && oldSet && overwrite) {
         res = injected;
@@ -242,12 +269,13 @@ public class Injector extends Configured
     }
   }
 
-  public Injector() {}
-  
+  public Injector() {
+  }
+
   public Injector(Configuration conf) {
     setConf(conf);
   }
-  
+
   public void inject(Path crawlDb, Path urlDir) throws IOException {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
@@ -257,56 +285,92 @@ public class Injector extends Configured
       LOG.info("Injector: urlDir: " + urlDir);
     }
 
-    Path tempDir =
-      new Path(getConf().get("mapred.temp.dir", ".") +
-               "/inject-temp-"+
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+        + "/inject-temp-"
+        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     // map text input file to a <url,CrawlDatum> file
     if (LOG.isInfoEnabled()) {
       LOG.info("Injector: Converting injected urls to crawl db entries.");
     }
+
+    FileSystem fs = FileSystem.get(getConf());
+    // determine if the crawldb already exists
+    boolean dbExists = fs.exists(crawlDb);
+
     JobConf sortJob = new NutchJob(getConf());
     sortJob.setJobName("inject " + urlDir);
     FileInputFormat.addInputPath(sortJob, urlDir);
     sortJob.setMapperClass(InjectMapper.class);
 
     FileOutputFormat.setOutputPath(sortJob, tempDir);
-    sortJob.setOutputFormat(SequenceFileOutputFormat.class);
+    if (dbExists) {
+      // Don't run merge injected urls, wait for merge with
+      // existing DB
+      sortJob.setOutputFormat(SequenceFileOutputFormat.class);
+      sortJob.setNumReduceTasks(0);
+    } else {
+      sortJob.setOutputFormat(MapFileOutputFormat.class);
+      sortJob.setReducerClass(InjectReducer.class);
+      sortJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+          false);
+    }
     sortJob.setOutputKeyClass(Text.class);
     sortJob.setOutputValueClass(CrawlDatum.class);
     sortJob.setLong("injector.current.time", System.currentTimeMillis());
-    RunningJob mapJob = JobClient.runJob(sortJob);
 
-    long urlsInjected = mapJob.getCounters().findCounter("injector", "urls_injected").getValue();
-    long urlsFiltered = mapJob.getCounters().findCounter("injector", "urls_filtered").getValue();
-    LOG.info("Injector: total number of urls rejected by filters: " + urlsFiltered);
-    LOG.info("Injector: total number of urls injected after normalization and filtering: "
+    RunningJob mapJob = null;
+    try {
+      mapJob = JobClient.runJob(sortJob);
+    } catch (IOException e) {
+      fs.delete(tempDir, true);
+      throw e;
+    }
+    long urlsInjected = mapJob.getCounters()
+        .findCounter("injector", "urls_injected").getValue();
+    long urlsFiltered = mapJob.getCounters()
+        .findCounter("injector", "urls_filtered").getValue();
+    LOG.info("Injector: Total number of urls rejected by filters: "
+        + urlsFiltered);
+    LOG.info("Injector: Total number of urls after normalization: "
         + urlsInjected);
-
-    // merge with existing crawl db
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Injector: Merging injected urls into crawl db.");
+    long urlsMerged = 0;
+    if (dbExists) {
+      // merge with existing crawl db
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Injector: Merging injected urls into crawl db.");
+      }
+      JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
+      FileInputFormat.addInputPath(mergeJob, tempDir);
+      mergeJob.setReducerClass(InjectReducer.class);
+      try {
+        RunningJob merge = JobClient.runJob(mergeJob);
+        urlsMerged = merge.getCounters().findCounter("injector", "urls_merged")
+            .getValue();
+        LOG.info("Injector: URLs merged: " + urlsMerged);
+      } catch (IOException e) {
+        fs.delete(tempDir, true);
+        throw e;
+      }
+      CrawlDb.install(mergeJob, crawlDb);
+    } else {
+      CrawlDb.install(sortJob, crawlDb);
     }
-    JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
-    FileInputFormat.addInputPath(mergeJob, tempDir);
-    mergeJob.setReducerClass(InjectReducer.class);
-    JobClient.runJob(mergeJob);
-    CrawlDb.install(mergeJob, crawlDb);
 
     // clean up
-    FileSystem fs = FileSystem.get(getConf());
     fs.delete(tempDir, true);
-
+    LOG.info("Injector: Total new urls injected: "
+        + (urlsInjected - urlsMerged));
     long end = System.currentTimeMillis();
-    LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+    LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
   }
 
   public static void main(String[] args) throws Exception {
     int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
     System.exit(res);
   }
-  
+
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
       System.err.println("Usage: Injector <crawldb> <url_dir>");