You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2010/06/30 12:36:29 UTC

svn commit: r959259 [4/12] - in /nutch/branches/nutchbase: ./ bin/ conf/ contrib/ docs/ ivy/ lib/ lib/jetty-ext/ src/engines/ src/gora/ src/java/ src/java/org/apache/nutch/analysis/ src/java/org/apache/nutch/clustering/ src/java/org/apache/nutch/crawl/...

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java Wed Jun 30 10:36:20 2010
@@ -3,75 +3,79 @@ package org.apache.nutch.crawl;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableMapper;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
 import org.apache.nutch.net.URLFilterException;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.hbase.WebTableRow;
-import org.apache.nutch.util.hbase.TableUtil;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
 
-public class GeneratorMapper 
-extends TableMapper<SelectorEntry, WebTableRow> {
+public class GeneratorMapper
+extends GoraMapper<String, WebPage, SelectorEntry, WebPage> {
 
   private URLFilters filters;
   private URLNormalizers normalizers;
   private boolean filter;
+  private boolean normalise;
   private FetchSchedule schedule;
   private ScoringFilters scoringFilters;
   private long curTime;
 
   @Override
-  public void map(ImmutableBytesWritable key, Result result,
+  public void map(String reversedUrl, WebPage page,
       Context context) throws IOException, InterruptedException {
-    String reversedUrl = Bytes.toString(key.get());
     String url = TableUtil.unreverseUrl(reversedUrl);
 
-    WebTableRow row = new WebTableRow(result);
+    if (Mark.GENERATE_MARK.checkMark(page) != null) {
+      if (GeneratorJob.LOG.isDebugEnabled()) {
+        GeneratorJob.LOG.debug("Skipping " + url + "; already generated");
+      }
+    }
 
     // If filtering is on don't generate URLs that don't pass URLFilters
     try {
-      url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+      if (normalise) {
+        url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+      }
       if (filter && filters.filter(url) == null)
         return;
     } catch (URLFilterException e) {
-      Generator.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
+      GeneratorJob.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
       return;
     }
 
     // check fetch schedule
-    if (!schedule.shouldFetch(url, row, curTime)) {
-      if (Generator.LOG.isDebugEnabled()) {
-        Generator.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" + 
-            row.getFetchTime() + ", curTime=" + curTime);
+    if (!schedule.shouldFetch(url, page, curTime)) {
+      if (GeneratorJob.LOG.isDebugEnabled()) {
+        GeneratorJob.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" +
+            page.getFetchTime() + ", curTime=" + curTime);
       }
       return;
     }
-    
-    float score = row.getScore();
+    float score = page.getScore();
     try {
-      score = scoringFilters.generatorSortValue(url, row, score);
-    } catch (ScoringFilterException e) { 
-      // ignore
+      score = scoringFilters.generatorSortValue(url, page, score);
+    } catch (ScoringFilterException e) {
+      //ignore
     }
     SelectorEntry entry = new SelectorEntry(url, score);
-    context.write(entry, row);
-
+    context.write(entry, page);
   }
 
   @Override
   public void setup(Context context) {
     Configuration conf = context.getConfiguration();
     filters = new URLFilters(conf);
-    curTime = conf.getLong(Generator.CRAWL_GEN_CUR_TIME,
-        System.currentTimeMillis());
-    normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
-    filter = conf.getBoolean(Generator.CRAWL_GENERATE_FILTER, true);
+    curTime =
+      conf.getLong(GeneratorJob.GENERATOR_CUR_TIME, System.currentTimeMillis());
+    normalizers =
+      new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+    filter = conf.getBoolean(GeneratorJob.GENERATOR_FILTER, true);
+    normalise = conf.getBoolean(GeneratorJob.GENERATOR_NORMALISE, true);
     schedule = FetchScheduleFactory.getFetchSchedule(conf);
     scoringFilters = new ScoringFilters(conf);
   }

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java Wed Jun 30 10:36:20 2010
@@ -3,51 +3,62 @@ package org.apache.nutch.crawl;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Random;
 
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.nutch.crawl.Generator.SelectorEntry;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
+import org.apache.nutch.fetcher.FetcherJob.FetcherMapper;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.URLUtil;
+import org.gora.mapreduce.GoraReducer;
 
 /** Reduce class for generate
- * 
+ *
  * The #reduce() method write a random integer to all generated URLs. This random
  * number is then used by {@link FetcherMapper}.
  *
  */
 public class GeneratorReducer
-extends TableReducer<SelectorEntry, WebTableRow, SelectorEntry> {
+extends GoraReducer<SelectorEntry, WebPage, String, WebPage> {
 
   private long limit;
-  private long maxPerHost;
+  private long maxCount;
   private long count = 0;
+  private boolean byDomain = false;
   private Map<String, Integer> hostCountMap = new HashMap<String, Integer>();
-  private Random random = new Random();
+  private Utf8 crawlId;
 
   @Override
-  protected void reduce(SelectorEntry key, Iterable<WebTableRow> values,
+  protected void reduce(SelectorEntry key, Iterable<WebPage> values,
       Context context) throws IOException, InterruptedException {
-    for (WebTableRow row : values) {
-      if (maxPerHost > 0) {
-        String host = key.host;
-        Integer hostCount = hostCountMap.get(host);
+    for (WebPage page : values) {
+      if (maxCount > 0) {
+        String hostordomain;
+        if (byDomain) {
+          hostordomain = URLUtil.getDomainName(key.url);
+        } else {
+          hostordomain = URLUtil.getHost(key.url);
+        }
+
+        Integer hostCount = hostCountMap.get(hostordomain);
         if (hostCount == null) {
-          hostCountMap.put(host, 0);
+          hostCountMap.put(hostordomain, 0);
           hostCount = 0;
         }
-        if (hostCount > maxPerHost) {
+        if (hostCount >= maxCount) {
           return;
         }
-        hostCountMap.put(host, hostCount + 1);
+        hostCountMap.put(hostordomain, hostCount + 1);
       }
       if (count >= limit) {
         return;
       }
-      
-      row.putMeta(Generator.GENERATOR_MARK, Bytes.toBytes(random.nextInt()));
-      row.makeRowMutation().writeToContext(key, context);
+
+      Mark.GENERATE_MARK.putMark(page, crawlId);
+      context.write(TableUtil.reverseUrl(key.url), page);
+      context.getCounter("Generator", "GENERATE_MARK").increment(1);
       count++;
     }
   }
@@ -56,13 +67,20 @@ extends TableReducer<SelectorEntry, WebT
   protected void setup(Context context)
       throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
-    long totalLimit = conf.getLong(Generator.CRAWL_TOP_N, Long.MAX_VALUE);
+    long totalLimit = conf.getLong(GeneratorJob.GENERATOR_TOP_N, Long.MAX_VALUE);
     if (totalLimit == Long.MAX_VALUE) {
-      limit = Long.MAX_VALUE; 
+      limit = Long.MAX_VALUE;
     } else {
       limit = totalLimit / context.getNumReduceTasks();
     }
-    maxPerHost = conf.getLong(Generator.GENERATE_MAX_PER_HOST, -1);
+    maxCount = conf.getLong(GeneratorJob.GENERATOR_MAX_COUNT, -2);
+    crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID));
+    String countMode =
+      conf.get(GeneratorJob.GENERATOR_COUNT_MODE, GeneratorJob.GENERATOR_COUNT_VALUE_HOST);
+    if (countMode.equals(GeneratorJob.GENERATOR_COUNT_VALUE_DOMAIN)) {
+      byDomain = true;
+    }
+
   }
-  
+
 }
\ No newline at end of file

Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/InjectorJob.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,249 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.mapreduce.GoraOutputFormat;
+
+/** This class takes a flat file of URLs and adds them to the of pages to be
+ * crawled.  Useful for bootstrapping the system.
+ * The URL files contain one URL per line, optionally followed by custom metadata
+ * separated by tabs with the metadata key separated from the corresponding value by '='. <br>
+ * Note that some metadata keys are reserved : <br>
+ * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
+ * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
+ * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
+ **/
+public class InjectorJob extends GoraMapper<String, WebPage, String, WebPage>
+    implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(InjectorJob.class);
+
+  private Configuration conf;
+
+  private FetchSchedule schedule;
+
+  private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+  private static final Utf8 YES_STRING = new Utf8("y");
+
+  static {
+    FIELDS.add(WebPage.Field.MARKERS);
+    FIELDS.add(WebPage.Field.STATUS);
+  }
+
+  /** metadata key reserved for setting a custom score for a specific URL */
+  public static String nutchScoreMDName = "nutch.score";
+  /**
+   * metadata key reserved for setting a custom fetchInterval for a specific URL
+   */
+  public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
+
+  public static class UrlMapper extends
+      Mapper<LongWritable, Text, String, WebPage> {
+    private URLNormalizers urlNormalizers;
+    private int interval;
+    private float scoreInjected;
+    private URLFilters filters;
+    private ScoringFilters scfilters;
+    private long curTime;
+
+    @Override
+    protected void setup(Context context) throws IOException,
+        InterruptedException {
+      urlNormalizers = new URLNormalizers(context.getConfiguration(),
+          URLNormalizers.SCOPE_INJECT);
+      interval = context.getConfiguration().getInt("db.fetch.interval.default",
+          2592000);
+      filters = new URLFilters(context.getConfiguration());
+      scfilters = new ScoringFilters(context.getConfiguration());
+      scoreInjected = context.getConfiguration().getFloat("db.score.injected",
+          1.0f);
+      curTime = context.getConfiguration().getLong("injector.current.time",
+          System.currentTimeMillis());
+    }
+
+    @Override
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      String url = value.toString();
+
+      // if tabs : metadata that could be stored
+      // must be name=value and separated by \t
+      float customScore = -1f;
+      int customInterval = interval;
+      Map<String, String> metadata = new TreeMap<String, String>();
+      if (url.indexOf("\t") != -1) {
+        String[] splits = url.split("\t");
+        url = splits[0];
+        for (int s = 1; s < splits.length; s++) {
+          // find separation between name and value
+          int indexEquals = splits[s].indexOf("=");
+          if (indexEquals == -1) {
+            // skip anything without a =
+            continue;
+          }
+          String metaname = splits[s].substring(0, indexEquals);
+          String metavalue = splits[s].substring(indexEquals + 1);
+          if (metaname.equals(nutchScoreMDName)) {
+            try {
+              customScore = Float.parseFloat(metavalue);
+            } catch (NumberFormatException nfe) {
+            }
+          } else if (metaname.equals(nutchFetchIntervalMDName)) {
+            try {
+              customInterval = Integer.parseInt(metavalue);
+            } catch (NumberFormatException nfe) {
+            }
+          } else
+            metadata.put(metaname, metavalue);
+        }
+      }
+      try {
+        url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
+        url = filters.filter(url); // filter the url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        url = null;
+      }
+      if (url == null)
+        return;
+
+      String reversedUrl = TableUtil.reverseUrl(url);
+      WebPage row = new WebPage();
+      row.setFetchTime(curTime);
+      row.setFetchInterval(customInterval);
+      if (customScore != -1)
+        row.setScore(customScore);
+      else {
+        row.setScore(scoreInjected);
+        try {
+          scfilters.injectedScore(url, row);
+        } catch (ScoringFilterException e) {
+          if (LOG.isWarnEnabled()) {
+            LOG.warn("Cannot filter injected score for url " + url
+                + ", using default (" + e.getMessage() + ")");
+          }
+          row.setScore(scoreInjected);
+        }
+      }
+      // now add the metadata
+      Iterator<String> keysIter = metadata.keySet().iterator();
+      while (keysIter.hasNext()) {
+        String keymd = keysIter.next();
+        String valuemd = metadata.get(keymd);
+        row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
+      }
+      Mark.INJECT_MARK.putMark(row, YES_STRING);
+      context.write(reversedUrl, row);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    schedule = FetchScheduleFactory.getFetchSchedule(conf);
+    // scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+  }
+
+  @Override
+  protected void map(String key, WebPage row, Context context)
+      throws IOException, InterruptedException {
+    if (Mark.INJECT_MARK.checkMark(row) == null) {
+      return;
+    }
+    Mark.INJECT_MARK.removeMark(row);
+    if (!row.isReadable(WebPage.Field.STATUS.getIndex())) {
+      row.setStatus(CrawlStatus.STATUS_UNFETCHED);
+      schedule.initializeSchedule(key, row);
+      // row.setScore(scoreInjected);
+    }
+
+    context.write(key, row);
+  }
+
+  void inject(Path urlDir) throws Exception {
+    LOG.info("InjectorJob: starting");
+    LOG.info("InjectorJob: urlDir: " + urlDir);
+
+    getConf().setLong("injector.current.time", System.currentTimeMillis());
+    Job job = new NutchJob(getConf(), "inject-p1 " + urlDir);
+    FileInputFormat.addInputPath(job, urlDir);
+    job.setMapperClass(UrlMapper.class);
+    job.setMapOutputKeyClass(String.class);
+    job.setMapOutputValueClass(WebPage.class);
+    job.setOutputFormatClass(GoraOutputFormat.class);
+    GoraOutputFormat.setOutput(job, String.class,
+        WebPage.class, StorageUtils.getDataStoreClass(getConf()), true);
+    job.setReducerClass(Reducer.class);
+    job.setNumReduceTasks(0);
+    job.waitForCompletion(true);
+
+    job = new NutchJob(getConf(), "inject-p2 " + urlDir);
+    StorageUtils.initMapperJob(job, FIELDS, String.class, WebPage.class,
+        InjectorJob.class);
+    job.setNumReduceTasks(0);
+    job.waitForCompletion(true);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 1) {
+      System.err.println("Usage: InjectorJob <url_dir>");
+      return -1;
+    }
+    try {
+      inject(new Path(args[0]));
+      LOG.info("InjectorJob: finished");
+      return -0;
+    } catch (Exception e) {
+      LOG.fatal("InjectorJob: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(), args);
+    System.exit(res);
+  }
+}

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java Wed Jun 30 10:36:20 2010
@@ -21,34 +21,31 @@ import java.util.Collection;
 import java.util.HashSet;
 
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableColumns;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.storage.WebPage;
 
 /**
  * Default implementation of a page signature. It calculates an MD5 hash
  * of the raw binary content of a page. In case there is no content, it
  * calculates a hash from the page's URL.
- * 
+ *
  * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
  */
 public class MD5Signature extends Signature {
 
-  private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
-  
+  private final static Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
   static {
-    COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+    FIELDS.add(WebPage.Field.CONTENT);
   }
-  
+
   @Override
-  public byte[] calculate(WebTableRow row, Parse parse) {
-    byte[] data = row.getContent();
+  public byte[] calculate(WebPage page) {
+    byte[] data = page.getContent().array();
     return MD5Hash.digest(data).getDigest();
   }
 
   @Override
-  public Collection<HbaseColumn> getColumns() {
-    return COLUMNS;
+  public Collection<WebPage.Field> getFields() {
+    return FIELDS;
   }
 }

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java Wed Jun 30 10:36:20 2010
@@ -20,12 +20,12 @@ import org.apache.hadoop.io.Writable;
 import org.apache.nutch.util.GenericWritableConfigurable;
 
 public class NutchWritable extends GenericWritableConfigurable {
-  
+
   private static Class<? extends Writable>[] CLASSES = null;
-  
+
   static {
     CLASSES = new Class[] {
-      org.apache.hadoop.io.NullWritable.class, 
+      org.apache.hadoop.io.NullWritable.class,
       org.apache.hadoop.io.LongWritable.class,
       org.apache.hadoop.io.BytesWritable.class,
       org.apache.hadoop.io.FloatWritable.class,
@@ -44,17 +44,16 @@ public class NutchWritable extends Gener
       org.apache.nutch.parse.ParseStatus.class,
       org.apache.nutch.protocol.Content.class,
       org.apache.nutch.protocol.ProtocolStatus.class,
-      org.apache.nutch.searcher.Hit.class,
-      org.apache.nutch.searcher.HitDetails.class,
-      org.apache.nutch.searcher.Hits.class,
+//      org.apache.nutch.searcher.Hit.class,
+//      org.apache.nutch.searcher.HitDetails.class,
+//      org.apache.nutch.searcher.Hits.class,
       org.apache.nutch.scoring.ScoreDatum.class,
-      org.apache.nutch.util.hbase.WebTableRow.class,
-      org.apache.hadoop.hbase.client.Result.class
+      org.apache.nutch.util.WebPageWritable.class,
     };
   }
 
   public NutchWritable() { }
-  
+
   public NutchWritable(Writable instance) {
     set(instance);
   }

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java Wed Jun 30 10:36:20 2010
@@ -19,14 +19,12 @@ package org.apache.nutch.crawl;
 
 import java.util.Collection;
 
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableRow;
 import org.apache.hadoop.conf.Configured;
+import org.apache.nutch.storage.WebPage;
 
 public abstract class Signature extends Configured {
-  
-  public abstract byte[] calculate(WebTableRow row, Parse parse);
-  
-  public abstract Collection<HbaseColumn> getColumns();
+
+  public abstract byte[] calculate(WebPage page);
+
+  public abstract Collection<WebPage.Field> getFields();
 }

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java Wed Jun 30 10:36:20 2010
@@ -22,17 +22,15 @@ import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-// Hadoop imports
 import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.ObjectCache;
-import org.apache.nutch.util.hbase.HbaseColumn;
 
 /**
  * Factory class, which instantiates a Signature implementation according to the
  * current Configuration configuration. This newly created instance is cached in the
  * Configuration instance, so that it could be later retrieved.
- * 
+ *
  * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
  */
 public class SignatureFactory {
@@ -58,9 +56,9 @@ public class SignatureFactory {
     }
     return impl;
   }
-  
-  public static Collection<HbaseColumn> getColumns(Configuration conf) {
+
+  public static Collection<WebPage.Field> getFields(Configuration conf) {
     Signature impl = getSignature(conf);
-    return impl.getColumns();
+    return impl.getFields();
   }
 }

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java Wed Jun 30 10:36:20 2010
@@ -26,11 +26,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.MD5Hash;
-
-import org.apache.nutch.parse.Parse;
-import org.apache.nutch.util.hbase.HbaseColumn;
-import org.apache.nutch.util.hbase.WebTableColumns;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.storage.WebPage;
 
 /**
  * <p>An implementation of a page signature. It calculates an MD5 hash
@@ -54,26 +50,27 @@ import org.apache.nutch.util.hbase.WebTa
  * in the order of decreasing frequency.</li>
  * </ul>
  * This list is then submitted to an MD5 hash calculation.
- * 
+ *
  * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
  */
 public class TextProfileSignature extends Signature {
 
-  private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
-  
+  private final static Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
   static {
-    COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+    FIELDS.add(WebPage.Field.CONTENT);
   }
-  
+
   Signature fallback = new MD5Signature();
 
-  public byte[] calculate(WebTableRow row, Parse parse) {
+  @Override
+  public byte[] calculate(WebPage page) {
     int MIN_TOKEN_LEN = getConf().getInt("db.signature.text_profile.min_token_len", 2);
     float QUANT_RATE = getConf().getFloat("db.signature.text_profile.quant_rate", 0.01f);
     HashMap<String, Token> tokens = new HashMap<String, Token>();
     String text = null;
-    if (parse != null) text = parse.getText();
-    if (text == null || text.length() == 0) return fallback.calculate(row, parse);
+    if (page.getText() != null) text = page.getText().toString();
+    if (text == null || text.length() == 0) return fallback.calculate(page);
     StringBuffer curToken = new StringBuffer();
     int maxFreq = 0;
     for (int i = 0; i < text.length(); i++) {
@@ -138,24 +135,26 @@ public class TextProfileSignature extend
     return MD5Hash.digest(newText.toString()).getDigest();
   }
 
-  public Collection<HbaseColumn> getColumns() {
-    return COLUMNS;
+  @Override
+  public Collection<WebPage.Field> getFields() {
+    return FIELDS;
   }
-  
+
   private static class Token {
     public int cnt;
     public String val;
-    
+
     public Token(int cnt, String val) {
       this.cnt = cnt;
       this.val = val;
     }
-    
+
+    @Override
     public String toString() {
       return val + " " + cnt;
     }
   }
-  
+
   private static class TokenComparator implements Comparator<Token> {
     public int compare(Token t1, Token t2) {
       return t2.cnt - t1.cnt;

Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLPartitioner.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.crawl.GeneratorJob.SelectorEntry;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Partition urls by host, domain name or IP depending on the value of the
+ * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
+ */
+public class URLPartitioner
+extends Partitioner<SelectorEntry, WebPage>
+implements Configurable {
+  private static final Log LOG = LogFactory.getLog(URLPartitioner.class);
+
+  public static final String PARTITION_MODE_KEY = "partition.url.mode";
+
+  public static final String PARTITION_MODE_HOST = "byHost";
+  public static final String PARTITION_MODE_DOMAIN = "byDomain";
+  public static final String PARTITION_MODE_IP = "byIP";
+
+  private Configuration conf;
+
+  private int seed;
+  private URLNormalizers normalizers;
+  private String mode = PARTITION_MODE_HOST;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    seed = conf.getInt("partition.url.seed", 0);
+    mode = conf.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST);
+    // check that the mode is known
+    if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN)
+        && !mode.equals(PARTITION_MODE_HOST)) {
+      LOG.error("Unknown partition mode : " + mode + " - forcing to byHost");
+      mode = PARTITION_MODE_HOST;
+    }
+    normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_PARTITION);
+  }
+
+  public void setup(Configuration conf) {
+
+  }
+
+  @Override
+  public int getPartition(SelectorEntry key, WebPage value, int numReduceTasks) {
+    String urlString = key.url;
+    URL url = null;
+    int hashCode = urlString.hashCode();
+    try {
+      urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_PARTITION);
+      url = new URL(urlString);
+      hashCode = url.getHost().hashCode();
+    } catch (MalformedURLException e) {
+      LOG.warn("Malformed URL: '" + urlString + "'");
+    }
+
+    if (mode.equals(PARTITION_MODE_DOMAIN) && url != null) hashCode = URLUtil
+        .getDomainName(url).hashCode();
+    else if (mode.equals(PARTITION_MODE_IP)) {
+      try {
+        InetAddress address = InetAddress.getByName(url.getHost());
+        hashCode = address.getHostAddress().hashCode();
+      } catch (UnknownHostException e) {
+        GeneratorJob.LOG.info("Couldn't find IP for host: " + url.getHost());
+      }
+    }
+
+    // make hosts wind up in different partitions on different runs
+    hashCode ^= seed;
+
+    return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
+  }
+}

Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/URLWebPage.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,32 @@
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.storage.WebPage;
+
+public class URLWebPage {
+
+  private String url;
+
+  private WebPage datum;
+
+  public URLWebPage(String url, WebPage datum) {
+    this.url = url;
+    this.datum = datum;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+  public void setUrl(String url) {
+    this.url = url;
+  }
+
+  public WebPage getDatum() {
+    return datum;
+  }
+
+  public void setDatum(WebPage datum) {
+    this.datum = datum;
+  }
+
+}

Added: nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/WebTableReader.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,466 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Map.Entry;
+import java.util.regex.Pattern;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+import org.gora.query.Query;
+import org.gora.query.Result;
+import org.gora.store.DataStore;
+
+/**
+ * Displays information about the entries of the webtable
+ **/
+
+public class WebTableReader extends Configured implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(WebTableReader.class);
+
+  public static class WebTableStatMapper extends
+      GoraMapper<String, WebPage, Text, LongWritable> {
+    LongWritable COUNT_1 = new LongWritable(1);
+    private boolean sort = false;
+
+    public WebTableStatMapper() {
+    }
+
+    public void setup(Context context) {
+      sort = context.getConfiguration().getBoolean("db.reader.stats.sort",
+          false);
+    }
+
+    public void close() {
+    }
+
+    @Override
+    protected void map(
+        String key,
+        WebPage value,
+        org.apache.hadoop.mapreduce.Mapper<String, WebPage, Text, LongWritable>.Context context)
+        throws IOException, InterruptedException {
+      context.write(new Text("T"), COUNT_1);
+      context.write(new Text("status " + value.getStatus()), COUNT_1);
+      context.write(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
+      context.write(new Text("s"), new LongWritable(
+          (long) (value.getScore() * 1000.0)));
+      if (sort) {
+        URL u = new URL(TableUtil.unreverseUrl(key.toString()));
+        String host = u.getHost();
+        context.write(new Text("status " + value.getStatus() + " " + host),
+            COUNT_1);
+      }
+
+    }
+  }
+
+  public static class WebTableStatCombiner extends
+      Reducer<Text, LongWritable, Text, LongWritable> {
+    LongWritable val = new LongWritable();
+
+    public void setup(Context context) {
+    }
+
+    public void cleanup(Context context) {
+    }
+
+    @Override
+    public void reduce(
+        Text key,
+        Iterable<LongWritable> values,
+        org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
+        throws IOException, InterruptedException {
+      val.set(0L);
+      Iterator<LongWritable> iter = values.iterator();
+      String k = ((Text) key).toString();
+      if (!k.equals("s")) {
+        while (iter.hasNext()) {
+          LongWritable cnt = iter.next();
+          val.set(val.get() + cnt.get());
+        }
+        context.write(key, val);
+      } else {
+        long total = 0;
+        long min = Long.MAX_VALUE;
+        long max = Long.MIN_VALUE;
+        while (iter.hasNext()) {
+          LongWritable cnt = iter.next();
+          if (cnt.get() < min)
+            min = cnt.get();
+          if (cnt.get() > max)
+            max = cnt.get();
+          total += cnt.get();
+        }
+        context.write(new Text("scn"), new LongWritable(min));
+        context.write(new Text("scx"), new LongWritable(max));
+        context.write(new Text("sct"), new LongWritable(total));
+      }
+    }
+
+  }
+
+  public static class WebTableStatReducer extends
+      Reducer<Text, LongWritable, Text, LongWritable> {
+
+    public void cleanup(Context context) {
+    }
+
+    @Override
+    protected void reduce(
+        Text key,
+        Iterable<LongWritable> values,
+        org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
+        throws IOException, InterruptedException {
+      Iterator<LongWritable> iter = values.iterator();
+      String k = ((Text) key).toString();
+      if (k.equals("T")) {
+        // sum all values for this key
+        long sum = 0;
+        while (iter.hasNext()) {
+          sum += iter.next().get();
+        }
+        // output sum
+        context.write(key, new LongWritable(sum));
+      } else if (k.startsWith("status") || k.startsWith("retry")) {
+        LongWritable cnt = new LongWritable();
+        while (iter.hasNext()) {
+          LongWritable val = iter.next();
+          cnt.set(cnt.get() + val.get());
+        }
+        context.write(key, cnt);
+      } else if (k.equals("scx")) {
+        LongWritable cnt = new LongWritable(Long.MIN_VALUE);
+        while (iter.hasNext()) {
+          LongWritable val = iter.next();
+          if (cnt.get() < val.get())
+            cnt.set(val.get());
+        }
+        context.write(key, cnt);
+      } else if (k.equals("scn")) {
+        LongWritable cnt = new LongWritable(Long.MAX_VALUE);
+        while (iter.hasNext()) {
+          LongWritable val = iter.next();
+          if (cnt.get() > val.get())
+            cnt.set(val.get());
+        }
+        context.write(key, cnt);
+      } else if (k.equals("sct")) {
+        LongWritable cnt = new LongWritable();
+        while (iter.hasNext()) {
+          LongWritable val = iter.next();
+          cnt.set(cnt.get() + val.get());
+        }
+        context.write(key, cnt);
+      }
+    }
+
+  }
+
+  public void processStatJob(boolean sort) throws IOException,
+      ClassNotFoundException, InterruptedException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("WebTable statistics start");
+    }
+
+    Path tmpFolder = new Path(getConf().get("mapred.temp.dir", ".")
+        + "stat_tmp" + System.currentTimeMillis());
+
+    Job job = new NutchJob(getConf(), "db_stats");
+
+    job.getConfiguration().setBoolean("db.reader.stats.sort", sort);
+
+    DataStore<String, WebPage> store = StorageUtils.createDataStore(job
+        .getConfiguration(), String.class, WebPage.class);
+    Query<String, WebPage> query = store.newQuery();
+    query.setFields(WebPage._ALL_FIELDS);
+
+    GoraMapper.initMapperJob(job, query, store, Text.class, LongWritable.class,
+        WebTableStatMapper.class, null, true);
+
+    job.setCombinerClass(WebTableStatCombiner.class);
+    job.setReducerClass(WebTableStatReducer.class);
+
+    FileOutputFormat.setOutputPath(job, tmpFolder);
+
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+
+    boolean success = job.waitForCompletion(true);
+
+    FileSystem fileSystem = FileSystem.get(getConf());
+
+    if (!success) {
+      fileSystem.delete(tmpFolder, true);
+      return;
+    }
+
+    Text key = new Text();
+    LongWritable value = new LongWritable();
+
+    SequenceFile.Reader[] readers = org.apache.hadoop.mapred.SequenceFileOutputFormat
+        .getReaders(getConf(), tmpFolder);
+
+    TreeMap<String, LongWritable> stats = new TreeMap<String, LongWritable>();
+    for (int i = 0; i < readers.length; i++) {
+      SequenceFile.Reader reader = readers[i];
+      while (reader.next(key, value)) {
+        String k = key.toString();
+        LongWritable val = stats.get(k);
+        if (val == null) {
+          val = new LongWritable();
+          if (k.equals("scx"))
+            val.set(Long.MIN_VALUE);
+          if (k.equals("scn"))
+            val.set(Long.MAX_VALUE);
+          stats.put(k, val);
+        }
+        if (k.equals("scx")) {
+          if (val.get() < value.get())
+            val.set(value.get());
+        } else if (k.equals("scn")) {
+          if (val.get() > value.get())
+            val.set(value.get());
+        } else {
+          val.set(val.get() + value.get());
+        }
+      }
+      reader.close();
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Statistics for WebTable: ");
+      LongWritable totalCnt = stats.get("T");
+      stats.remove("T");
+      LOG.info("TOTAL urls:\t" + totalCnt.get());
+      for (Map.Entry<String, LongWritable> entry : stats.entrySet()) {
+        String k = entry.getKey();
+        LongWritable val = entry.getValue();
+        if (k.equals("scn")) {
+          LOG.info("min score:\t" + (float) (val.get() / 1000.0f));
+        } else if (k.equals("scx")) {
+          LOG.info("max score:\t" + (float) (val.get() / 1000.0f));
+        } else if (k.equals("sct")) {
+          LOG.info("avg score:\t"
+              + (float) ((((double) val.get()) / totalCnt.get()) / 1000.0));
+        } else if (k.startsWith("status")) {
+          String[] st = k.split(" ");
+          int code = Integer.parseInt(st[1]);
+          if (st.length > 2)
+            LOG.info("   " + st[2] + " :\t" + val);
+          else
+            LOG.info(st[0] + " " + code + " ("
+                + CrawlDatum.getStatusName((byte) code) + "):\t" + val);
+        } else
+          LOG.info(k + ":\t" + val);
+      }
+    }
+    // removing the tmp folder
+    fileSystem.delete(tmpFolder, true);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("WebTable statistics: done");
+    }
+
+  }
+
+  /** Prints out the entry to the standard out **/
+  private void read(String key) throws ClassNotFoundException, IOException {
+    DataStore<String, WebPage> datastore = StorageUtils.createDataStore(getConf(),
+        String.class, WebPage.class);
+
+    Query<String, WebPage> query = datastore.newQuery();
+    String reversedUrl = TableUtil.reverseUrl(key);
+    query.setKey(reversedUrl);
+
+    Result<String, WebPage> result = datastore.execute(query);
+    boolean found = false;
+    // should happen only once
+    while (result.next()) {
+      WebPage page = result.get();
+      String skey = result.getKey();
+      // we should not get to this point but nevermind
+      if (page == null || skey == null)
+        break;
+      found = true;
+      String url = TableUtil.unreverseUrl(skey);
+      System.out.println(getPageRepresentation(url, page));
+    }
+    if (!found)
+      System.out.println(key + " not found");
+    result.close();
+    datastore.close();
+  }
+
+  /** Filters the entries from the table based on a regex **/
+  public static class WebTableRegexMapper extends
+      GoraMapper<String, WebPage, Text, Text> {
+
+    static final String regexParamName = "webtable.url.regex";
+
+    public WebTableRegexMapper() {
+    }
+
+    private Pattern regex = null;
+
+    @Override
+    protected void map(
+        String key,
+        WebPage value,
+        org.apache.hadoop.mapreduce.Mapper<String, WebPage, Text, Text>.Context context)
+        throws IOException, InterruptedException {
+      // checks whether the Key passes the regex
+      String url = TableUtil.unreverseUrl(key.toString());
+      if (regex.matcher(url).matches()) {
+        context.write(new Text(url),
+            new Text(getPageRepresentation(key, value)));
+      }
+    }
+
+    @Override
+    protected void setup(
+        org.apache.hadoop.mapreduce.Mapper<String, WebPage, Text, Text>.Context context)
+        throws IOException, InterruptedException {
+      regex = Pattern.compile(context.getConfiguration().get(regexParamName,
+          ".+"));
+    }
+
+  }
+
+  public void processDumpJob(String output, Configuration config, String regex)
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("WebTable dump: starting");
+    }
+
+    Path outFolder = new Path(output);
+    Job job = new NutchJob(getConf(), "db_dump");
+
+    job.getConfiguration().set(WebTableRegexMapper.regexParamName, regex);
+
+    DataStore<String, WebPage> store = StorageUtils.createDataStore(job
+        .getConfiguration(), String.class, WebPage.class);
+    Query<String, WebPage> query = store.newQuery();
+    query.setFields(WebPage._ALL_FIELDS);
+
+    GoraMapper.initMapperJob(job, query, store, Text.class, Text.class,
+        WebTableRegexMapper.class, null, true);
+
+    FileOutputFormat.setOutputPath(job, outFolder);
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    boolean success = job.waitForCompletion(true);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("WebTable dump: done");
+    }
+  }
+
+  private static String getPageRepresentation(String key, WebPage page) {
+    StringBuffer sb = new StringBuffer();
+    sb.append(key).append("\n");
+    sb.append("baseUrl:\t" + page.getBaseUrl()).append("\n");
+    sb.append("status:\t").append(page.getStatus()).append(" (").append(
+        CrawlStatus.getName((byte) page.getStatus())).append(")\n");
+    sb.append("parse_status:\t" + page.getParseStatus()).append("\n");
+    sb.append("title:\t" + page.getTitle()).append("\n");
+    sb.append("score:\t" + page.getScore()).append("\n");
+
+    Map<Utf8, ByteBuffer> metadata = page.getMetadata();
+    if (metadata != null) {
+      Iterator<Entry<Utf8, ByteBuffer>> iterator = metadata.entrySet()
+          .iterator();
+      while (iterator.hasNext()) {
+        Entry<Utf8, ByteBuffer> entry = iterator.next();
+        sb.append("metadata " + entry.getKey().toString()).append(" : \t")
+            .append(Bytes.toString(entry.getValue().array())).append("\n");
+      }
+    }
+    return sb.toString();
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new WebTableReader(),
+        args);
+    System.exit(res);
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 1) {
+      System.err
+          .println("Usage: WebTableReader (-stats | -url [url] | -dump <out_dir> [-regex regex])");
+      System.err
+          .println("\t-stats [-sort] \tprint overall statistics to System.out");
+      System.err.println("\t\t[-sort]\tlist status sorted by host");
+      System.err
+          .println("\t-url <url>\tprint information on <url> to System.out");
+      System.err
+          .println("\t-dump <out_dir> [-regex regex]\tdump the webtable to a text file in <out_dir>");
+      System.err
+          .println("\t\t[-regex]\tfilter on the URL of the webtable entry");
+      return -1;
+    }
+    String param = null;
+    try {
+      for (int i = 0; i < args.length; i++) {
+        if (args[i].equals("-url")) {
+          param = args[++i];
+          read(param);
+          return 0;
+        } else if (args[i].equals("-stats")) {
+          boolean toSort = false;
+          if (i < args.length - 1 && "-sort".equals(args[i + 1])) {
+            toSort = true;
+            i++;
+          }
+          processStatJob(toSort);
+        } else if (args[i].equals("-dump")) {
+          param = args[++i];
+          String regex = ".+";
+          if (i < args.length - 1 && "-regex".equals(args[i + 1]))
+            regex = args[i = i + 2];
+          processDumpJob(param, getConf(), regex);
+        }
+      }
+    } catch (Exception e) {
+      LOG.fatal("WebTableReader: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+    return 0;
+  }
+
+}

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java Wed Jun 30 10:36:20 2010
@@ -4,46 +4,45 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.nutch.storage.WebPage;
+import org.gora.util.IOUtils;
 
-public class FetchEntry implements Writable {
+public class FetchEntry extends Configured implements Writable {
+
+  private String key;
+  private WebPage page;
 
-  private ImmutableBytesWritable key;
-  private Result row;
-  
   public FetchEntry() {
-    key = new ImmutableBytesWritable();
-    row = new Result();
-  }
-  
-  public FetchEntry(FetchEntry fe) {
-    this.key = new ImmutableBytesWritable(fe.key.get().clone());
+    super(null);
   }
-  
-  public FetchEntry(ImmutableBytesWritable key, Result row) {
+
+  public FetchEntry(Configuration conf, String key, WebPage page) {
+    super(conf);
     this.key = key;
-    this.row = row;
+    this.page = page;
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
-    key.readFields(in);
-    row.readFields(in);
+    key = Text.readString(in);
+    page = IOUtils.deserialize(getConf(), in, null, WebPage.class);
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    key.write(out);
-    row.write(out);
+    Text.writeString(out, key);
+    IOUtils.serialize(getConf(), out, page, WebPage.class);
   }
 
-  public ImmutableBytesWritable getKey() {
+  public String getKey() {
     return key;
   }
 
-  public Result getRow() {
-    return row;
+  public WebPage getWebPage() {
+    return page;
   }
 }

Added: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java?rev=959259&view=auto
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java (added)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherJob.java Wed Jun 30 10:36:20 2010
@@ -0,0 +1,245 @@
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.apache.avro.util.Utf8;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.GeneratorJob;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.parse.ParserJob;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.StorageUtils;
+import org.apache.nutch.storage.WebPage;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TableUtil;
+import org.gora.mapreduce.GoraMapper;
+
+/**
+ * Multi-threaded fetcher.
+ *
+ */
+public class FetcherJob implements Tool {
+
+  public static final String PROTOCOL_REDIR = "protocol";
+
+  public static final int PERM_REFRESH_TIME = 5;
+
+  public static final Utf8 REDIRECT_DISCOVERED = new Utf8("___rdrdsc__");
+
+  private static final Collection<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
+
+  static {
+    FIELDS.add(WebPage.Field.MARKERS);
+    FIELDS.add(WebPage.Field.REPR_URL);
+  }
+
+  /**
+   * <p>
+   * Mapper class for Fetcher.
+   * </p>
+   * <p>
+   * This class reads the random integer written by {@link GeneratorJob} as its key
+   * while outputting the actual key and value arguments through a
+   * {@link FetchEntry} instance.
+   * </p>
+   * <p>
+   * This approach (combined with the use of {@link PartitionUrlByHost}) makes
+   * sure that Fetcher is still polite while also randomizing the key order. If
+   * one host has a huge number of URLs in your table while other hosts have
+   * not, {@link FetcherReducer} will not be stuck on one host but process URLs
+   * from other hosts as well.
+   * </p>
+   */
+  public static class FetcherMapper
+  extends GoraMapper<String, WebPage, IntWritable, FetchEntry> {
+
+    private boolean shouldContinue;
+
+    private Utf8 crawlId;
+
+    private Random random = new Random();
+
+    @Override
+    protected void setup(Context context) {
+      Configuration conf = context.getConfiguration();
+      shouldContinue = conf.getBoolean("job.continue", false);
+      crawlId = new Utf8(conf.get(GeneratorJob.CRAWL_ID, Nutch.ALL_CRAWL_ID_STR));
+    }
+
+    @Override
+    protected void map(String key, WebPage page, Context context)
+        throws IOException, InterruptedException {
+      Utf8 mark = Mark.GENERATE_MARK.checkMark(page);
+      if (!NutchJob.shouldProcess(mark, crawlId)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different crawl id");
+        }
+        return;
+      }
+      if (shouldContinue && Mark.FETCH_MARK.checkMark(page) != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; already fetched");
+        }
+        return;
+      }
+      context.write(new IntWritable(random.nextInt(65536)), new FetchEntry(context
+          .getConfiguration(), key, page));
+    }
+  }
+
+  public static final Log LOG = LogFactory.getLog(FetcherJob.class);
+
+  private Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Collection<WebPage.Field> getFields(Job job) {
+    Collection<WebPage.Field> fields = new HashSet<WebPage.Field>(FIELDS);
+    if (job.getConfiguration().getBoolean("fetcher.parse", true)) {
+      ParserJob parserJob = new ParserJob();
+      fields.addAll(parserJob.getFields(job));
+    }
+    ProtocolFactory protocolFactory = new ProtocolFactory(job.getConfiguration());
+    fields.addAll(protocolFactory.getFields());
+
+    return fields;
+  }
+
+  private void fetch(int threads, String crawlId, boolean shouldContinue, boolean isParsing)
+  throws Exception {
+    LOG.info("FetcherJob: starting");
+
+    checkConfiguration();
+
+    if (threads > 0) {
+      getConf().setInt("fetcher.threads.fetch", threads);
+    }
+    getConf().set(GeneratorJob.CRAWL_ID, crawlId);
+    getConf().setBoolean("fetcher.parse", isParsing);
+    getConf().setBoolean("job.continue", shouldContinue);
+    
+    // set the actual time for the timelimit relative
+    // to the beginning of the whole job and not of a specific task
+    // otherwise it keeps trying again if a task fails
+    long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+    if (timelimit != -1) {
+      timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+      getConf().setLong("fetcher.timelimit", timelimit);
+    }
+
+    LOG.info("FetcherJob : timelimit set for : " + timelimit);
+    LOG.info("FetcherJob: threads: " + getConf().getInt("fetcher.threads.fetch", 10));
+    LOG.info("FetcherJob: parsing: " + getConf().getBoolean("fetcher.parse", true));
+    LOG.info("FetcherJob: continuing: " + getConf().getBoolean("job.continue", false));
+    if (crawlId.equals(Nutch.ALL_CRAWL_ID_STR)) {
+      LOG.info("FetcherJob: fetching all");
+    } else {
+      LOG.info("FetcherJob: crawlId: " + crawlId);
+    }
+
+    Job job = new NutchJob(getConf(), "fetch");
+
+    Collection<WebPage.Field> fields = getFields(job);
+    StorageUtils.initMapperJob(job, fields, IntWritable.class,
+        FetchEntry.class, FetcherMapper.class, PartitionUrlByHost.class, false);
+    StorageUtils.initReducerJob(job, FetcherReducer.class);
+
+    job.waitForCompletion(true);
+
+    LOG.info("FetcherJob: done");
+  }
+
+  private void checkConfiguration() {
+
+    // ensure that a value has been set for the agent name and that that
+    // agent name is the first value in the agents we advertise for robot
+    // rules parsing
+    String agentName = getConf().get("http.agent.name");
+    if (agentName == null || agentName.trim().length() == 0) {
+      String message = "Fetcher: No agents listed in 'http.agent.name'"
+          + " property.";
+      if (LOG.isFatalEnabled()) {
+        LOG.fatal(message);
+      }
+      throw new IllegalArgumentException(message);
+    } else {
+
+      // get all of the agents that we advertise
+      String agentNames = getConf().get("http.robots.agents");
+      StringTokenizer tok = new StringTokenizer(agentNames, ",");
+      ArrayList<String> agents = new ArrayList<String>();
+      while (tok.hasMoreTokens()) {
+        agents.add(tok.nextToken().trim());
+      }
+
+      // if the first one is not equal to our agent name, log fatal and throw
+      // an exception
+      if (!(agents.get(0)).equalsIgnoreCase(agentName)) {
+        String message = "Fetcher: Your 'http.agent.name' value should be "
+            + "listed first in 'http.robots.agents' property.";
+        LOG.warn(message);
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    int threads = -1;
+    boolean shouldContinue = false;
+    boolean isParsing = true;
+    String crawlId;
+
+    String usage = "Usage: FetcherJob (<crawl id> | -all) [-threads N] [-noParsing] [-continue]";
+
+    if (args.length == 0) {
+      System.err.println(usage);
+      return 1;
+    }
+
+    crawlId = args[0];
+    if (crawlId.equals("-threads") || crawlId.equals("-continue") || crawlId.equals("-noParsing")) {
+      System.err.println(usage);
+      return 1;
+    }
+    for (int i = 1; i < args.length; i++) {
+      if ("-threads".equals(args[i])) {
+        // found -threads option
+        threads = Integer.parseInt(args[++i]);
+      } else if ("-continue".equals(args[i])) {
+        shouldContinue = true;
+      } else if ("-noParsing".equals(args[i])) {
+        isParsing = false;
+      }
+    }
+
+    fetch(threads, crawlId, shouldContinue, isParsing); // run the Fetcher
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new FetcherJob(), args);
+    System.exit(res);
+  }
+}

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java Wed Jun 30 10:36:20 2010
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URL;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -16,34 +17,35 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.TableReducer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.nutch.crawl.CrawlDatumHbase;
-import org.apache.nutch.fetcher.Fetcher;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.nutch.crawl.CrawlStatus;
+import org.apache.nutch.crawl.URLWebPage;
 import org.apache.nutch.net.URLFilterException;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.ParseUtil;
 import org.apache.nutch.protocol.Content;
 import org.apache.nutch.protocol.Protocol;
 import org.apache.nutch.protocol.ProtocolFactory;
 import org.apache.nutch.protocol.ProtocolOutput;
-import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.ProtocolStatusCodes;
 import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.storage.Mark;
+import org.apache.nutch.storage.ProtocolStatus;
+import org.apache.nutch.storage.ProtocolStatusUtils;
+import org.apache.nutch.storage.WebPage;
 import org.apache.nutch.util.LogUtil;
+import org.apache.nutch.util.TableUtil;
 import org.apache.nutch.util.URLUtil;
-import org.apache.nutch.util.hbase.WebTableColumns;
-import org.apache.nutch.util.hbase.TableUtil;
-import org.apache.nutch.util.hbase.WebTableRow;
+import org.gora.mapreduce.GoraReducer;
 
 public class FetcherReducer
-extends TableReducer<ImmutableBytesWritable, FetchEntry, ImmutableBytesWritable> {
+extends GoraReducer<IntWritable, FetchEntry, String, WebPage> {
 
-  public static final Log LOG = Fetcher.LOG;
+  public static final Log LOG = FetcherJob.LOG;
 
   private final AtomicInteger activeThreads = new AtomicInteger(0);
   private final AtomicInteger spinWaiting = new AtomicInteger(0);
@@ -57,34 +59,35 @@ extends TableReducer<ImmutableBytesWrita
 
   private QueueFeeder feeder;
 
-  private List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
+  private final List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
 
   private FetchItemQueues fetchQueues;
-  
-  private static final ImmutableBytesWritable REDUCE_KEY =
-    new ImmutableBytesWritable(TableUtil.YES_VAL);
+
+  private boolean isParsing;
+
+  private ParseUtil parseUtil;
 
   /**
    * This class described the item to be fetched.
    */
   private static class FetchItem {
-    WebTableRow row;
+    WebPage page;
     String queueID;
     String url;
     URL u;
 
-    public FetchItem(String url, WebTableRow row, URL u, String queueID) {
-      this.row = row;
+    public FetchItem(String url, WebPage page, URL u, String queueID) {
+      this.page = page;
       this.url = url;
       this.u = u;
       this.queueID = queueID;
     }
 
-    /** Create an item. Queue id will be created based on <code>byIP</code>
-     * argument, either as a protocol + hostname pair, or protocol + IP
-     * address pair.
+    /** Create an item. Queue id will be created based on <code>queueMode</code>
+     * argument, either as a protocol + hostname pair, protocol + IP
+     * address pair or protocol+domain pair.
      */
-    public static FetchItem create(String url, WebTableRow row, boolean byIP) {
+    public static FetchItem create(String url, WebPage page, String queueMode) {
       String queueID;
       URL u = null;
       try {
@@ -95,7 +98,7 @@ extends TableReducer<ImmutableBytesWrita
       }
       final String proto = u.getProtocol().toLowerCase();
       String host;
-      if (byIP) {
+      if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
         try {
           final InetAddress addr = InetAddress.getByName(u.getHost());
           host = addr.getHostAddress();
@@ -104,16 +107,23 @@ extends TableReducer<ImmutableBytesWrita
           LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
           return null;
         }
-      } else {
+      }
+      else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)){
+        host = URLUtil.getDomainName(u);
+        if (host == null) {
+          LOG.warn("Unknown domain for url: " + url + ", using URL string as key");
+          host=u.toExternalForm();
+        }
+      }
+      else {
         host = u.getHost();
         if (host == null) {
-          LOG.warn("Unknown host for url: " + url + ", skipping.");
-          return null;
+          LOG.warn("Unknown host for url: " + url + ", using URL string as key");
+          host=u.toExternalForm();
         }
-        host = host.toLowerCase();
       }
-      queueID = proto + "://" + host;
-      return new FetchItem(url, row, u, queueID);
+      queueID = proto + "://" + host.toLowerCase();
+      return new FetchItem(url, page, u, queueID);
     }
 
   }
@@ -203,6 +213,12 @@ extends TableReducer<ImmutableBytesWrita
       else
         nextFetchTime.set(endTime);
     }
+    
+    public synchronized int emptyQueue() {
+      int presize = queue.size();
+      queue.clear();
+      return presize;
+    }
   }
 
   /**
@@ -215,18 +231,30 @@ extends TableReducer<ImmutableBytesWrita
     Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
     AtomicInteger totalSize = new AtomicInteger(0);
     int maxThreads;
-    boolean byIP;
+    String queueMode;
     long crawlDelay;
     long minCrawlDelay;
     Configuration conf;
+    long timelimit = -1;
+
+    public static final String QUEUE_MODE_HOST = "byHost";
+    public static final String QUEUE_MODE_DOMAIN = "byDomain";
+    public static final String QUEUE_MODE_IP = "byIP";
 
     public FetchItemQueues(Configuration conf) {
       this.conf = conf;
-      this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
-      // backward-compatible default setting
-      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
+      this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
+      queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
+      // check that the mode is known
+      if (!queueMode.equals(QUEUE_MODE_IP) && !queueMode.equals(QUEUE_MODE_DOMAIN)
+          && !queueMode.equals(QUEUE_MODE_HOST)) {
+        LOG.error("Unknown partition mode : " + queueMode + " - forcing to byHost");
+        queueMode = QUEUE_MODE_HOST;
+      }
+      LOG.info("Using queue mode : "+queueMode);
       this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
       this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+      this.timelimit = conf.getLong("fetcher.timelimit", -1);
     }
 
     public int getTotalSize() {
@@ -237,8 +265,8 @@ extends TableReducer<ImmutableBytesWrita
       return queues.size();
     }
 
-    public void addFetchItem(String url, WebTableRow row) {
-      final FetchItem it = FetchItem.create(url, row, byIP);
+    public void addFetchItem(String url, WebPage page) {
+      final FetchItem it = FetchItem.create(url, page, queueMode);
       if (it != null) addFetchItem(it);
     }
 
@@ -290,6 +318,29 @@ extends TableReducer<ImmutableBytesWrita
       }
       return null;
     }
+    
+    public synchronized int checkTimelimit() {
+      int count = 0;
+      if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+        // emptying the queues
+        for (String id : queues.keySet()) {
+          FetchItemQueue fiq = queues.get(id);
+          if (fiq.getQueueSize() == 0) continue;
+          LOG.info("* queue: " + id + " >> timelimit! ");
+          int deleted = fiq.emptyQueue();
+          for (int i = 0; i < deleted; i++) {
+            totalSize.decrementAndGet();
+          }
+          count += deleted;
+        }
+        // there might also be a case where totalsize !=0 but number of queues
+        // == 0
+        // in which case we simply force it to 0 to avoid blocking
+        if (totalSize.get() != 0 && queues.size() == 0) totalSize.set(0);
+      }
+      return count;
+    }
+    
 
     public synchronized void dump() {
       for (final String id : queues.keySet()) {
@@ -315,7 +366,7 @@ extends TableReducer<ImmutableBytesWrita
     private String reprUrl;
     private boolean redirecting;
     private int redirectCount;
-    private Context context;
+    private final Context context;
 
     public FetcherThread(Context context, int num) {
       this.setDaemon(true);                       // don't hang JVM on exit
@@ -358,10 +409,10 @@ extends TableReducer<ImmutableBytesWrita
             }
           }
           lastRequestStart.set(System.currentTimeMillis());
-          if (!fit.row.hasColumn(WebTableColumns.REPR_URL, null)) {
-            reprUrl = fit.url.toString();
+          if (!fit.page.isReadable(WebPage.Field.REPR_URL.getIndex())) {
+            reprUrl = fit.url;
           } else {
-            reprUrl = fit.row.getReprUrl();
+            reprUrl = TableUtil.toString(fit.page.getReprUrl());
           }
           try {
             LOG.info("fetching " + fit.url);
@@ -375,15 +426,15 @@ extends TableReducer<ImmutableBytesWrita
               }
               redirecting = false;
               final Protocol protocol = this.protocolFactory.getProtocol(fit.url);
-              final RobotRules rules = protocol.getRobotRules(fit.url, fit.row);
+              final RobotRules rules = protocol.getRobotRules(fit.url, fit.page);
               if (!rules.isAllowed(fit.u)) {
                 // unblock
                 fetchQueues.finishFetchItem(fit, true);
                 if (LOG.isDebugEnabled()) {
                   LOG.debug("Denied by robots.txt: " + fit.url);
                 }
-                output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED,
-                    CrawlDatumHbase.STATUS_GONE);
+                output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED,
+                    CrawlStatus.STATUS_GONE);
                 continue;
               }
               if (rules.getCrawlDelay() > 0) {
@@ -391,79 +442,83 @@ extends TableReducer<ImmutableBytesWrita
                   // unblock
                   fetchQueues.finishFetchItem(fit, true);
                   LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
-                  output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatumHbase.STATUS_GONE);
+                  output(fit, null, ProtocolStatusUtils.STATUS_ROBOTS_DENIED, CrawlStatus.STATUS_GONE);
                   continue;
                 } else {
                   final FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
                   fiq.crawlDelay = rules.getCrawlDelay();
                 }
               }
-              final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.row);
+              final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.page);
               final ProtocolStatus status = output.getStatus();
               final Content content = output.getContent();
               // unblock queue
               fetchQueues.finishFetchItem(fit);
 
+              context.getCounter("FetcherStatus", "code_"+status.getCode()).increment(1);
+
+              int length = 0;
+              if (content!=null && content.getContent()!=null) length= content.getContent().length;
+              updateStatus(length);
+
               switch(status.getCode()) {
 
-              case ProtocolStatus.WOULDBLOCK:
+              case ProtocolStatusCodes.WOULDBLOCK:
                 // retry ?
                 fetchQueues.addFetchItem(fit);
                 break;
 
-              case ProtocolStatus.SUCCESS:        // got a page
-                output(fit, content, status, CrawlDatumHbase.STATUS_FETCHED);
-                updateStatus(content.getContent().length);
+              case ProtocolStatusCodes.SUCCESS:        // got a page
+                output(fit, content, status, CrawlStatus.STATUS_FETCHED);
                 break;
 
-              case ProtocolStatus.MOVED:         // redirect
-              case ProtocolStatus.TEMP_MOVED:
+              case ProtocolStatusCodes.MOVED:         // redirect
+              case ProtocolStatusCodes.TEMP_MOVED:
                 byte code;
                 boolean temp;
-                if (status.getCode() == ProtocolStatus.MOVED) {
-                  code = CrawlDatumHbase.STATUS_REDIR_PERM;
+                if (status.getCode() == ProtocolStatusCodes.MOVED) {
+                  code = CrawlStatus.STATUS_REDIR_PERM;
                   temp = false;
                 } else {
-                  code = CrawlDatumHbase.STATUS_REDIR_TEMP;
+                  code = CrawlStatus.STATUS_REDIR_TEMP;
                   temp = true;
                 }
                 output(fit, content, status, code);
-                final String newUrl = status.getMessage();
-                handleRedirect(fit.url, newUrl, temp,  Fetcher.PROTOCOL_REDIR);
+                final String newUrl = ProtocolStatusUtils.getMessage(status);
+                handleRedirect(fit.url, newUrl, temp,  FetcherJob.PROTOCOL_REDIR);
                 redirecting = false;
                 break;
-              case ProtocolStatus.EXCEPTION:
-                logError(fit.url, status.getMessage());
+              case ProtocolStatusCodes.EXCEPTION:
+                logError(fit.url, ProtocolStatusUtils.getMessage(status));
                 /* FALLTHROUGH */
-              case ProtocolStatus.RETRY:          // retry
-              case ProtocolStatus.BLOCKED:
-                output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+              case ProtocolStatusCodes.RETRY:          // retry
+              case ProtocolStatusCodes.BLOCKED:
+                output(fit, null, status, CrawlStatus.STATUS_RETRY);
                 break;
 
-              case ProtocolStatus.GONE:           // gone
-              case ProtocolStatus.NOTFOUND:
-              case ProtocolStatus.ACCESS_DENIED:
-              case ProtocolStatus.ROBOTS_DENIED:
-                output(fit, null, status, CrawlDatumHbase.STATUS_GONE);
+              case ProtocolStatusCodes.GONE:           // gone
+              case ProtocolStatusCodes.NOTFOUND:
+              case ProtocolStatusCodes.ACCESS_DENIED:
+              case ProtocolStatusCodes.ROBOTS_DENIED:
+                output(fit, null, status, CrawlStatus.STATUS_GONE);
                 break;
 
-              case ProtocolStatus.NOTMODIFIED:
-                output(fit, null, status, CrawlDatumHbase.STATUS_NOTMODIFIED);
+              case ProtocolStatusCodes.NOTMODIFIED:
+                output(fit, null, status, CrawlStatus.STATUS_NOTMODIFIED);
                 break;
 
               default:
                 if (LOG.isWarnEnabled()) {
                   LOG.warn("Unknown ProtocolStatus: " + status.getCode());
                 }
-                output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+                output(fit, null, status, CrawlStatus.STATUS_RETRY);
               }
 
               if (redirecting && redirectCount >= maxRedirect) {
                 fetchQueues.finishFetchItem(fit);
-                if (LOG.isInfoEnabled()) {
-                  LOG.info(" - redirect count exceeded " + fit.url);
-                }
-                output(fit, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatumHbase.STATUS_GONE);
+                LOG.info(" - redirect count exceeded " + fit.url);
+                output(fit, null, ProtocolStatusUtils.STATUS_REDIR_EXCEEDED,
+                    CrawlStatus.STATUS_GONE);
               }
 
             } while (redirecting && (redirectCount < maxRedirect));
@@ -471,17 +526,16 @@ extends TableReducer<ImmutableBytesWrita
           } catch (final Throwable t) {                 // unexpected exception
             // unblock
             fetchQueues.finishFetchItem(fit);
-            t.printStackTrace();
             logError(fit.url, t.toString());
-            output(fit, null, ProtocolStatus.STATUS_FAILED, CrawlDatumHbase.STATUS_RETRY);
+            t.printStackTrace(LogUtil.getDebugStream(LOG));
+            output(fit, null, ProtocolStatusUtils.STATUS_FAILED,
+                CrawlStatus.STATUS_RETRY);
           }
         }
 
       } catch (final Throwable e) {
-        if (LOG.isFatalEnabled()) {
-          e.printStackTrace(LogUtil.getFatalStream(LOG));
-          LOG.fatal("fetcher caught:"+e.toString());
-        }
+        LOG.fatal("fetcher caught:"+e.toString());
+        e.printStackTrace(LogUtil.getFatalStream(LOG));
       } finally {
         if (fit != null) fetchQueues.finishFetchItem(fit);
         activeThreads.decrementAndGet(); // count threads
@@ -499,14 +553,12 @@ extends TableReducer<ImmutableBytesWrita
       }
       reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
       final String reversedNewUrl = TableUtil.reverseUrl(newUrl);
-      // TODO: Find a way to use MutablWebTableRow here
-      Put put = new Put(Bytes.toBytes(reversedNewUrl));
+      WebPage newWebPage = new WebPage();
       if (!reprUrl.equals(url)) {
-        put.add(WebTableColumns.REPR_URL, null, Bytes.toBytes(reprUrl));
+        newWebPage.setReprUrl(new Utf8(reprUrl));
       }
-      put.add(WebTableColumns.METADATA,
-              Fetcher.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
-      context.write(REDUCE_KEY, put);
+      newWebPage.putToMetadata(FetcherJob.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
+      context.write(reversedNewUrl, newWebPage);
       if (LOG.isDebugEnabled()) {
         LOG.debug(" - " + redirType + " redirect to " +
             reprUrl + " (fetching later)");
@@ -522,27 +574,34 @@ extends TableReducer<ImmutableBytesWrita
     private void output(FetchItem fit, Content content,
         ProtocolStatus pstatus, byte status)
     throws IOException, InterruptedException {
-      fit.row.setStatus(status);
-      final long prevFetchTime = fit.row.getFetchTime();
-      fit.row.setPrevFetchTime(prevFetchTime);
-      fit.row.setFetchTime(System.currentTimeMillis());
+      fit.page.setStatus(status);
+      final long prevFetchTime = fit.page.getFetchTime();
+      fit.page.setPrevFetchTime(prevFetchTime);
+      fit.page.setFetchTime(System.currentTimeMillis());
       if (pstatus != null) {
-        fit.row.setProtocolStatus(pstatus);
+        fit.page.setProtocolStatus(pstatus);
       }
 
       if (content != null) {
-        fit.row.setContent(content.getContent());
-        fit.row.setContentType(content.getContentType());
-        fit.row.setBaseUrl(content.getBaseUrl());
+        fit.page.setContent(ByteBuffer.wrap(content.getContent()));
+        fit.page.setContentType(new Utf8(content.getContentType()));
+        fit.page.setBaseUrl(new Utf8(content.getBaseUrl()));
+      }
+      Mark.FETCH_MARK.putMark(fit.page, Mark.GENERATE_MARK.checkMark(fit.page));
+      String key = TableUtil.reverseUrl(fit.url);
+
+      if (isParsing) {
+        URLWebPage redirectedPage = parseUtil.process(key, fit.page);
+        if (redirectedPage != null) {
+          context.write(TableUtil.reverseUrl(redirectedPage.getUrl()),
+                        redirectedPage.getDatum());
+        }
       }
-      fit.row.putMeta(Fetcher.FETCH_MARK, TableUtil.YES_VAL);
-      fit.row.makeRowMutation().writeToContext(REDUCE_KEY, context);
+      context.write(key, fit.page);
     }
 
     private void logError(String url, String message) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("fetch of " + url + " failed with: " + message);
-      }
+      LOG.info("fetch of " + url + " failed with: " + message);
       errors.incrementAndGet();
     }
   }
@@ -557,6 +616,7 @@ extends TableReducer<ImmutableBytesWrita
     private final int size;
     private Iterator<FetchEntry> currentIter;
     boolean hasMore;
+    private long timelimit = -1;
 
     public QueueFeeder(Context context,
         FetchItemQueues queues, int size)
@@ -570,13 +630,29 @@ extends TableReducer<ImmutableBytesWrita
       if (hasMore) {
         currentIter = context.getValues().iterator();
       }
+      // the value of the time limit is either -1 or the time where it should finish
+      timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); 
     }
 
     @Override
     public void run() {
       int cnt = 0;
+      int timelimitcount = 0;
       try {
         while (hasMore) {
+          if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+            // enough .. lets' simply
+            // read all the entries from the input without processing them
+            while (currentIter.hasNext()) {
+              currentIter.next();
+              timelimitcount++;
+            }
+            hasMore = context.nextKey();
+            if (hasMore) {
+              currentIter = context.getValues().iterator();
+            }
+            continue;
+          }
           int feed = size - queues.getTotalSize();
           if (feed <= 0) {
             // queues are full - spin-wait until they have some free space
@@ -584,19 +660,15 @@ extends TableReducer<ImmutableBytesWrita
               Thread.sleep(1000);
             } catch (final Exception e) {};
             continue;
-          } 
+          }
           if (LOG.isDebugEnabled()) {
             LOG.debug("-feeding " + feed + " input urls ...");
           }
           while (feed > 0 && currentIter.hasNext()) {
-            FetchEntry entry = new FetchEntry();
-            // since currentIter.next() reuses the same
-            // FetchEntry object we need to clone it
-            Writables.copyWritable(currentIter.next(), entry);
-            WebTableRow row = new WebTableRow(entry.getRow());
+            FetchEntry entry = currentIter.next();
             final String url =
-              TableUtil.unreverseUrl(Bytes.toString(entry.getKey().get()));
-            queues.addFetchItem(url, row);
+              TableUtil.unreverseUrl(entry.getKey());
+            queues.addFetchItem(url, entry.getWebPage());
             feed--;
             cnt++;
           }
@@ -612,17 +684,35 @@ extends TableReducer<ImmutableBytesWrita
         LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
         return;
       }
-      LOG.info("QueueFeeder finished: total " + cnt + " records.");
+      LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :"
+          + timelimitcount);
+      context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount);
     }
   }
 
+  private void reportStatus(Context context) throws IOException {
+    StringBuffer status = new StringBuffer();
+    long elapsed = (System.currentTimeMillis() - start)/1000;
+    status.append(spinWaiting).append("/").append(activeThreads).append(" threads spinwaiting\n");
+    status.append(pages).append(" pages, ").append(errors).append(" errors, ");
+    status.append(Math.round(((float)pages.get()*10)/elapsed)/10.0).append(" pages/s, ");
+    status.append(Math.round(((((float)bytes.get())*8)/1024)/elapsed)).append(" kb/s, ");
+    status.append(this.fetchQueues.getTotalSize()).append(" URLs in ");
+    status.append(this.fetchQueues.getQueueCount()).append(" queues");
+    context.setStatus(status.toString());
+  }
+
   @Override
   public void run(Context context)
   throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
     this.fetchQueues = new FetchItemQueues(conf);
-    final int threadCount = conf.getInt("fetcher.threads.fetch", 10);
-    LOG.info("FetcherHbase: threads: " + threadCount);
+    int threadCount = conf.getInt("fetcher.threads.fetch", 10);
+    isParsing = conf.getBoolean("fetcher.parse", true);
+    if (isParsing) {
+      parseUtil = new ParseUtil(conf);
+    }
+    LOG.info("Fetcher: threads: " + threadCount);
 
     // set non-blocking & no-robots mode for HTTP protocol plugins.
     conf.setBoolean(Protocol.CHECK_BLOCKING, false);
@@ -645,12 +735,20 @@ extends TableReducer<ImmutableBytesWrita
       } catch (final InterruptedException e) {}
 
       context.progress();
+      reportStatus(context);
       LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
           + ", fetchQueues= " + fetchQueues.getQueueCount() +", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
 
       if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
         fetchQueues.dump();
       }
+      
+      // check timelimit
+      if (!feeder.isAlive()) {
+        int hitByTimeLimit = fetchQueues.checkTimelimit();
+        if (hitByTimeLimit != 0) context.getCounter("FetcherStatus","HitByTimeLimit-Queues").increment(hitByTimeLimit);
+      }
+      
       // some requests seem to hang, despite all intentions
       if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
         LOG.warn("Aborting with " + activeThreads + " hung threads.");

Modified: nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java?rev=959259&r1=959258&r2=959259&view=diff
==============================================================================
--- nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java (original)
+++ nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java Wed Jun 30 10:36:20 2010
@@ -2,17 +2,17 @@ package org.apache.nutch.fetcher;
 
 import java.net.URL;
 
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.util.TableUtil;
 
 public class PartitionUrlByHost
-extends Partitioner<ImmutableBytesWritable, FetchEntry> {
+extends Partitioner<IntWritable, FetchEntry> {
 
   @Override
-  public int getPartition(ImmutableBytesWritable key,
+  public int getPartition(IntWritable key,
       FetchEntry value, int numPartitions) {
-    String urlString = Bytes.toString(value.getKey().get());
+    String urlString = TableUtil.unreverseUrl(value.getKey());
 
     URL url = null;