You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2012/11/22 15:45:08 UTC

svn commit: r1412566 - in /nutch/branches/2.x: conf/gora-cassandra-mapping.xml src/java/org/apache/nutch/crawl/InjectorJob.java src/java/org/apache/nutch/storage/StorageUtils.java

Author: lewismc
Date: Thu Nov 22 14:45:07 2012
New Revision: 1412566

URL: http://svn.apache.org/viewvc?rev=1412566&view=rev
Log:
NUTCH-1370 Expose exact number of urls injected @runtime

Modified:
    nutch/branches/2.x/conf/gora-cassandra-mapping.xml
    nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java
    nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java

Modified: nutch/branches/2.x/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/conf/gora-cassandra-mapping.xml?rev=1412566&r1=1412565&r2=1412566&view=diff
==============================================================================
--- nutch/branches/2.x/conf/gora-cassandra-mapping.xml (original)
+++ nutch/branches/2.x/conf/gora-cassandra-mapping.xml Thu Nov 22 14:45:07 2012
@@ -55,4 +55,15 @@
         <field name="protocolStatus" family="sc" qualifier="prs"/>
     </class>
     
+    <keyspace name="host" cluster="Test Cluster" host="localhost">
+        <family name="mtdt" type="super"/>
+        <family name="il" type="super"/>
+        <family name="ol" type="super"/>
+    </keyspace>
+    <class keyClass="java.lang.String" name="org.apache.nutch.storage.Host" keyspace="host">
+        <field name="metadata" family="mtdt" qualifier="mtdt"/>
+        <field name="inlinks" family="il" qualifier="il"/>
+        <field name="outlinks" family="ol" qualifier="ol"/>
+    </class>
+    
 </gora-orm>

Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=1412566&r1=1412565&r2=1412566&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java Thu Nov 22 14:45:07 2012
@@ -18,6 +18,7 @@ package org.apache.nutch.crawl;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.TreeMap;
 
 import org.apache.avro.util.Utf8;
 import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.gora.persistency.Persistent;
 import org.apache.gora.store.DataStore;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -49,6 +51,7 @@ import org.apache.nutch.util.NutchConfig
 import org.apache.nutch.util.NutchJob;
 import org.apache.nutch.util.NutchTool;
 import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.TimingUtil;
 import org.apache.nutch.util.ToolUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +72,7 @@ public class InjectorJob extends NutchTo
   private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
 
   private static final Utf8 YES_STRING = new Utf8("y");
-
+  
   static {
     FIELDS.add(WebPage.Field.MARKERS);
     FIELDS.add(WebPage.Field.STATUS);
@@ -81,7 +84,7 @@ public class InjectorJob extends NutchTo
    * metadata key reserved for setting a custom fetchInterval for a specific URL
    */
   public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
-
+  
   public static class UrlMapper extends
       Mapper<LongWritable, Text, String, WebPage> {
     private URLNormalizers urlNormalizers;
@@ -92,24 +95,27 @@ public class InjectorJob extends NutchTo
     private long curTime;
 
     @Override
-    protected void setup(Context context) throws IOException,
-        InterruptedException {
+    protected void setup(Context context) throws IOException, InterruptedException {
       urlNormalizers = new URLNormalizers(context.getConfiguration(),
-          URLNormalizers.SCOPE_INJECT);
+        URLNormalizers.SCOPE_INJECT);
       interval = context.getConfiguration().getInt("db.fetch.interval.default",
-          2592000);
+        2592000);
       filters = new URLFilters(context.getConfiguration());
       scfilters = new ScoringFilters(context.getConfiguration());
       scoreInjected = context.getConfiguration().getFloat("db.score.injected",
-          1.0f);
+        1.0f);
       curTime = context.getConfiguration().getLong("injector.current.time",
-          System.currentTimeMillis());
+        System.currentTimeMillis());
     }
 
-    @Override
     protected void map(LongWritable key, Text value, Context context)
         throws IOException, InterruptedException {
-      String url = value.toString();
+      String url = value.toString(); // value is line of text
+      
+      if (url != null && url.trim().startsWith("#")) {
+        /* Ignore line that start with # */
+        return;
+      }
 
       // if tabs : metadata that could be stored
       // must be name=value and separated by \t
@@ -149,10 +155,11 @@ public class InjectorJob extends NutchTo
         LOG.warn("Skipping " + url + ":" + e);
         url = null;
       }
-      if (url == null)
+      if (url == null) {
+        context.getCounter("injector", "urls_filtered").increment(1);
         return;
-
-      String reversedUrl = TableUtil.reverseUrl(url);
+      } else {                                         // if it passes
+      String reversedUrl = TableUtil.reverseUrl(url);  // collect it
       WebPage row = new WebPage();
       row.setFetchTime(curTime);
       row.setFetchInterval(customInterval);
@@ -160,34 +167,33 @@ public class InjectorJob extends NutchTo
       // now add the metadata
       Iterator<String> keysIter = metadata.keySet().iterator();
       while (keysIter.hasNext()) {
-          String keymd = keysIter.next();
-          String valuemd = metadata.get(keymd);
-          row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
+        String keymd = keysIter.next();
+        String valuemd = metadata.get(keymd);
+        row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
       }
 
       if (customScore != -1)
-    	  row.setScore(customScore);
+        row.setScore(customScore);
       else
-    	  row.setScore(scoreInjected);
+        row.setScore(scoreInjected);
 
       try {
-    	  scfilters.injectedScore(url, row);
+        scfilters.injectedScore(url, row);
       } catch (ScoringFilterException e) {
-    	  if (LOG.isWarnEnabled()) {
-    		  LOG.warn("Cannot filter injected score for url " + url
-    				  + ", using default (" + e.getMessage() + ")");
-    	  }
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Cannot filter injected score for url " + url
+          + ", using default (" + e.getMessage() + ")");
+        }
       }
-      
+      context.getCounter("injector", "urls_injected").increment(1);
       row.putToMarkers(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));
-
       Mark.INJECT_MARK.putMark(row, YES_STRING);
       context.write(reversedUrl, row);
     }
+    }
   }
-
+  
   public InjectorJob() {
-
   }
 
   public InjectorJob(Configuration conf) {
@@ -211,21 +217,40 @@ public class InjectorJob extends NutchTo
     currentJob.setMapOutputKeyClass(String.class);
     currentJob.setMapOutputValueClass(WebPage.class);
     currentJob.setOutputFormatClass(GoraOutputFormat.class);
+    
     DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),
-        String.class, WebPage.class);
+      String.class, WebPage.class);
     GoraOutputFormat.setOutput(currentJob, store, true);
+    
+    // NUTCH-1471 Make explicit which datastore class we use
+    Class<? extends DataStore<Object, Persistent>> dataStoreClass = 
+      StorageUtils.getDataStoreClass(currentJob.getConfiguration());
+    LOG.info("InjectorJob: Using " + dataStoreClass + " as the Gora storage class.");
+    
     currentJob.setReducerClass(Reducer.class);
     currentJob.setNumReduceTasks(0);
+    
     currentJob.waitForCompletion(true);
     ToolUtil.recordJobStatus(null, currentJob, results);
+
+    // NUTCH-1370 Make explicit #URLs injected @runtime
+    long urlsInjected = currentJob.getCounters().findCounter("injector", "urls_injected").getValue();
+    long urlsFiltered = currentJob.getCounters().findCounter("injector", "urls_filtered").getValue();
+    LOG.info("InjectorJob: total number of urls rejected by filters: " + urlsFiltered);
+    LOG.info("InjectorJob: total number of urls injected after normalization and filtering: "
+        + urlsInjected);
+
     return results;
   }
 
   public void inject(Path urlDir) throws Exception {
-    LOG.info("InjectorJob: starting");
-    LOG.info("InjectorJob: urlDir: " + urlDir);
-
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("InjectorJob: starting at " + sdf.format(start));
+    LOG.info("InjectorJob: Injecting urlDir: " + urlDir); 
     run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
+    long end = System.currentTimeMillis();
+    LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
   }
 
   @Override
@@ -246,7 +271,6 @@ public class InjectorJob extends NutchTo
 
     try {
       inject(new Path(args[0]));
-      LOG.info("InjectorJob: finished");
       return -0;
     } catch (Exception e) {
       LOG.error("InjectorJob: " + StringUtils.stringifyException(e));

Modified: nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java?rev=1412566&r1=1412565&r2=1412566&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java Thu Nov 22 14:45:07 2012
@@ -75,9 +75,16 @@ public class StorageUtils {
     return DataStoreFactory.createDataStore(dataStoreClass,
             keyClass, persistentClass, conf, schema);
   }
-
+  
+  /**
+   * Return the Persistent Gora class used to persist Nutch Web data.
+   * 
+   * @param the Nutch configuration 
+   * @return the Gora DataStore persistent class
+   * @throws ClassNotFoundException
+   */
   @SuppressWarnings("unchecked")
-  private static <K, V extends Persistent> Class<? extends DataStore<K, V>>
+  public static <K, V extends Persistent> Class<? extends DataStore<K, V>>
   getDataStoreClass(Configuration conf)  throws ClassNotFoundException {
     return (Class<? extends DataStore<K, V>>)
       Class.forName(conf.get("storage.data.store.class",