You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/08/17 00:25:17 UTC

svn commit: r804789 [2/6] - in /lucene/nutch/branches/nutchbase: ./ bin/ conf/ lib/ src/java/org/apache/nutch/analysis/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/ind...

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Generator.java Sun Aug 16 22:25:12 2009
@@ -1,626 +1,185 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.nutch.crawl;
 
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.text.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
 
-// Commons Logging imports
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.net.URLFilterException;
-import org.apache.nutch.net.URLFilters;
-import org.apache.nutch.net.URLNormalizers;
-import org.apache.nutch.scoring.ScoringFilterException;
-import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.LockUtil;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
-
-/** Generates a subset of a crawl db to fetch. */
-public class Generator extends Configured implements Tool {
-
+import org.apache.nutch.util.URLUtil;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class Generator
+extends Configured
+implements Tool {
   public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
-  public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip";
   public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host";
-  public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
   public static final String CRAWL_TOP_N = "crawl.topN";
   public static final String CRAWL_GEN_CUR_TIME = "crawl.gen.curTime";
-  public static final String CRAWL_GEN_DELAY = "crawl.gen.delay";
-  public static final Log LOG = LogFactory.getLog(Generator.class);
+  public static final String CRAWL_RANDOM_SEED = "generate.partition.seed";
   
-  public static class SelectorEntry implements Writable {
-    public Text url;
-    public CrawlDatum datum;
-    
-    public SelectorEntry() {
-      url = new Text();
-      datum = new CrawlDatum();
-    }
-
-    public void readFields(DataInput in) throws IOException {
-      url.readFields(in);
-      datum.readFields(in);
-    }
-
-    public void write(DataOutput out) throws IOException {
-      url.write(out);
-      datum.write(out);
-    }
-    
-    public String toString() {
-      return "url=" + url.toString() + ", datum=" + datum.toString();
-    }
+  private static final Set<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+  
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_TIME));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.SCORE));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.STATUS));
   }
-
-  /** Selects entries due for fetch. */
-  public static class Selector implements Mapper<Text, CrawlDatum, FloatWritable, SelectorEntry>, Partitioner<FloatWritable, Writable>, Reducer<FloatWritable, SelectorEntry, FloatWritable, SelectorEntry> {
-    private LongWritable genTime = new LongWritable(System.currentTimeMillis());
-    private long curTime;
-    private long limit;
-    private long count;
-    private HashMap<String, IntWritable> hostCounts =
-      new HashMap<String, IntWritable>();
-    private int maxPerHost;
-    private HashSet<String> maxedHosts = new HashSet<String>();
-    private HashSet<String> dnsFailureHosts = new HashSet<String>();
-    private Partitioner<Text, Writable> hostPartitioner = new PartitionUrlByHost();
-    private URLFilters filters;
-    private URLNormalizers normalizers;
-    private ScoringFilters scfilters;
-    private SelectorEntry entry = new SelectorEntry();
-    private FloatWritable sortValue = new FloatWritable();
-    private boolean byIP;
-    private long dnsFailure = 0L;
-    private boolean filter;
-    private long genDelay;
-    private FetchSchedule schedule;
-
-    public void configure(JobConf job) {
-      curTime = job.getLong(CRAWL_GEN_CUR_TIME, System.currentTimeMillis());
-      limit = job.getLong(CRAWL_TOP_N,Long.MAX_VALUE)/job.getNumReduceTasks();
-      maxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
-      byIP = job.getBoolean(GENERATE_MAX_PER_HOST_BY_IP, false);
-      filters = new URLFilters(job);
-      normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
-      scfilters = new ScoringFilters(job);
-      hostPartitioner.configure(job);
-      filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
-      genDelay = job.getLong(CRAWL_GEN_DELAY, 7L) * 3600L * 24L * 1000L;
-      long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
-      if (time > 0) genTime.set(time);
-      schedule = FetchScheduleFactory.getFetchSchedule(job);
-    }
-
-    public void close() {}
-
-    /** Select & invert subset due for fetch. */
-    public void map(Text key, CrawlDatum value,
-                    OutputCollector<FloatWritable, SelectorEntry> output, Reporter reporter)
-      throws IOException {
-      Text url = key;
-      if (filter) {
-        // If filtering is on don't generate URLs that don't pass URLFilters
-        try {
-          if (filters.filter(url.toString()) == null)
-            return;
-        } catch (URLFilterException e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage()
-                + ")");
-          }
-        }
-      }
-      CrawlDatum crawlDatum = value;
-
-      // check fetch schedule
-      if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
-        LOG.debug("-shouldFetch rejected '" + url+ "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime);
-        return;
-      }
-
-      LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
-      if (oldGenTime != null) { // awaiting fetch & update
-        if (oldGenTime.get() + genDelay > curTime) // still wait for update
-          return;
-      }
-      float sort = 1.0f;
-      try {
-        sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort);
-      } catch (ScoringFilterException sfe) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
-        }
-      }
-      // sort by decreasing score, using DecreasingFloatComparator
-      sortValue.set(sort);
-      // record generation time
-      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
-      entry.datum = crawlDatum;
-      entry.url = (Text)key;
-      output.collect(sortValue, entry);          // invert for sort by score
-    }
-
-    /** Partition by host. */
-    public int getPartition(FloatWritable key, Writable value,
-                            int numReduceTasks) {
-      return hostPartitioner.getPartition(((SelectorEntry)value).url, key,
-                                          numReduceTasks);
-    }
-
-    /** Collect until limit is reached. */
-    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
-                       OutputCollector<FloatWritable, SelectorEntry> output,
-                       Reporter reporter)
-      throws IOException {
-
-      while (values.hasNext() && count < limit) {
-
-        SelectorEntry entry = values.next();
-        Text url = entry.url;        
-        String urlString = url.toString();        
-        URL u = null;
-        
-        // skip bad urls, including empty and null urls
-        try {
-          u = new URL(url.toString());
-        } catch (MalformedURLException e) {
-          LOG.info("Bad protocol in url: " + url.toString());
-          continue;
-        }
-        
-        String host = u.getHost();
-        host = host.toLowerCase();
-        String hostname = host;
-
-        // partitioning by ip will generate lots of DNS requests here, and will 
-        // be up to double the overall dns load, do not run this way unless you
-        // are running a local caching DNS server or a two layer DNS cache
-        if (byIP) {
-          if (maxedHosts.contains(host)) {
-            if (LOG.isDebugEnabled()) { LOG.debug("Host already maxed out: " + host); }
-            continue;
-          }
-          if (dnsFailureHosts.contains(host)) {
-            if (LOG.isDebugEnabled()) { LOG.debug("Host name lookup already failed: " + host); }
-            continue;
-          }
-          try {
-            InetAddress ia = InetAddress.getByName(host);
-            host = ia.getHostAddress();
-            urlString = new URL(u.getProtocol(), host, u.getPort(), u.getFile()).toString();
-          } 
-          catch (UnknownHostException uhe) {
-            // remember hostnames that could not be looked up
-            dnsFailureHosts.add(hostname);
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DNS lookup failed: " + host + ", skipping.");
-            }
-            dnsFailure++;
-            if ((dnsFailure % 1000 == 0) && (LOG.isWarnEnabled())) {
-              LOG.warn("DNS failures: " + dnsFailure);
-            }
-            continue;
-          }
-        }
-        
-        try {
-          urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
-          host = new URL(urlString).getHost();
-        } catch (Exception e) {
-          LOG.warn("Malformed URL: '" + urlString + "', skipping (" +
-              StringUtils.stringifyException(e) + ")");
-          continue;
-        }
-        
-        // only filter if we are counting hosts
-        if (maxPerHost > 0) {
-          
-          IntWritable hostCount = hostCounts.get(host);
-          if (hostCount == null) {
-            hostCount = new IntWritable();
-            hostCounts.put(host, hostCount);
-          }
   
-          // increment hostCount
-          hostCount.set(hostCount.get() + 1);
+  public static final byte[] GENERATOR_MARK =
+    Bytes.toBytes("__genmrk__");
   
-          // skip URL if above the limit per host.
-          if (hostCount.get() > maxPerHost) {
-            if (hostCount.get() == maxPerHost + 1) {
-              // remember the raw hostname that is maxed out
-              maxedHosts.add(hostname);
-              if (LOG.isInfoEnabled()) {
-                LOG.info("Host " + host + " has more than " + maxPerHost +
-                         " URLs." + " Skipping additional.");
-              }
-            }
-            continue;
-          }
-        }
-
-        output.collect(key, entry);
-
-        // Count is incremented only when we keep the URL
-        // maxPerHost may cause us to skip it.
-        count++;
-      }
-    }
-  }
+  public static final Log LOG = LogFactory.getLog(Generator.class);
 
-  public static class DecreasingFloatComparator extends FloatWritable.Comparator {
+  public static class SelectorEntry
+  implements WritableComparable<SelectorEntry> {
 
-    /** Compares two FloatWritables decreasing. */
-    public int compare(byte[] b1, int s1, int l1,
-        byte[] b2, int s2, int l2) {
-      return super.compare(b2, s2, l2, b1, s1, l1);
+    String url;
+    String host;
+    float score;
+    
+    public SelectorEntry() {  }
+    
+    public SelectorEntry(String url, float score) {
+      this.url = url;
+      this.host = URLUtil.getHost(url);
+      this.score = score;
     }
-  }
 
-  public static class SelectorInverseMapper extends MapReduceBase implements Mapper<FloatWritable, SelectorEntry, Text, SelectorEntry> {
+    public void readFields(DataInput in) throws IOException {
+      url = Text.readString(in);
+      host = Text.readString(in);
+      score = in.readFloat();
+    }
 
-    public void map(FloatWritable key, SelectorEntry value, OutputCollector<Text, SelectorEntry> output, Reporter reporter) throws IOException {
-      SelectorEntry entry = (SelectorEntry)value;
-      output.collect(entry.url, entry);
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, url);
+      Text.writeString(out, host);
+      out.writeFloat(score);
     }
-  }
-  
-  public static class PartitionReducer extends MapReduceBase
-      implements Reducer<Text, SelectorEntry, Text, CrawlDatum> {
 
-    public void reduce(Text key, Iterator<SelectorEntry> values,
-        OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
-      // if using HashComparator, we get only one input key in case of hash collision
-      // so use only URLs from values
-      while (values.hasNext()) {
-        SelectorEntry entry = values.next();
-        output.collect(entry.url, entry.datum);
-      }
+    public int compareTo(SelectorEntry se) {
+      if (se.score > score)
+        return 1;
+      else if (se.score == score)
+        return url.compareTo(se.url);
+      return -1;
     }
     
-  }
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result +  url.hashCode();
+      result = prime * result + Float.floatToIntBits(score);
+      return result;
+    }
 
-  /** Sort fetch lists by hash of URL. */
-  public static class HashComparator extends WritableComparator {
-    public HashComparator() {
-      super(Text.class);
-    }
-
-    public int compare(WritableComparable a, WritableComparable b) {
-      Text url1 = (Text) a;
-      Text url2 = (Text) b;
-      int hash1 = hash(url1.getBytes(), 0, url1.getLength());
-      int hash2 = hash(url2.getBytes(), 0, url2.getLength());
-      return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
-    }
-
-    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-      int hash1 = hash(b1, s1, l1);
-      int hash2 = hash(b2, s2, l2);
-      return (hash1 < hash2 ? -1 : (hash1 == hash2 ? 0 : 1));
-    }
-
-    private static int hash(byte[] bytes, int start, int length) {
-      int hash = 1;
-      // make later bytes more significant in hash code, so that sorting by
-      // hashcode correlates less with by-host ordering.
-      for (int i = length - 1; i >= 0; i--)
-        hash = (31 * hash) + (int) bytes[start + i];
-      return hash;
+    @Override
+    public boolean equals(Object obj) {
+      SelectorEntry other = (SelectorEntry) obj;
+      if (!url.equals(other.url))
+        return false;
+      if (Float.floatToIntBits(score) != Float.floatToIntBits(other.score))
+        return false;
+      return true;
     }
   }
 
-  /**
-   * Update the CrawlDB so that the next generate won't include the same URLs.
-   */
-  public static class CrawlDbUpdater extends MapReduceBase implements Mapper<WritableComparable, Writable, Text, CrawlDatum>, Reducer<Text, CrawlDatum, Text, CrawlDatum> {
-    long generateTime;
-    
-    public void configure(JobConf job) {
-      generateTime = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+  public static class SelectorEntryComparator extends WritableComparator {
+    public SelectorEntryComparator() {
+      super(SelectorEntry.class, true);
     }
-    
-    public void map(WritableComparable key, Writable value, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
-      if (key instanceof FloatWritable) { // tempDir source
-        SelectorEntry se = (SelectorEntry)value;
-        output.collect(se.url, se.datum);
-      } else {
-        output.collect((Text)key, (CrawlDatum)value);
-      }
-    }
-    private CrawlDatum orig = new CrawlDatum();
-    private LongWritable genTime = new LongWritable(0L);
-
-    public void reduce(Text key, Iterator<CrawlDatum> values, OutputCollector<Text, CrawlDatum> output, Reporter reporter) throws IOException {
-      while (values.hasNext()) {
-        CrawlDatum val = values.next();
-        if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
-          LongWritable gt = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
-          genTime.set(gt.get());
-          if (genTime.get() != generateTime) {
-            orig.set(val);
-            genTime.set(0L);
-            continue;
-          }
-        } else {
-          orig.set(val);
-        }
-      }
-      if (genTime.get() != 0L) {
-        orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
-      }
-      output.collect(key, orig);
-    }    
   }
   
-  public Generator() {}
-  
-  public Generator(Configuration conf) {
-    setConf(conf);
+  static {
+    WritableComparator.define(SelectorEntry.class,
+                              new SelectorEntryComparator());
   }
   
   /**
-   * Generate fetchlists in a segment. Whether to filter URLs or not is
-   * read from the crawl.generate.filter property in the configuration
-   * files. If the property is not found, the URLs are filtered.
-   *
-   * @param dbDir     Crawl database directory
-   * @param segments  Segments directory
-   * @param numLists  Number of reduce tasks
-   * @param topN      Number of top URLs to be selected
-   * @param curTime   Current time in milliseconds
-   *
-   * @return Path to generated segment or null if no entries were
-   *         selected
-   *
-   * @throws IOException When an I/O error occurs
-   */
-  public Path generate(Path dbDir, Path segments, int numLists,
-                       long topN, long curTime) throws IOException {
-
-    JobConf job = new NutchJob(getConf());
-    boolean filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
-    return generate(dbDir, segments, numLists, topN, curTime, filter, false);
-  }
-
-  /**
-   * Generate fetchlists in a segment.
-   * @return Path to generated segment or null if no entries were selected.
+   * Mark URLs ready for fetching.
+   * @throws ClassNotFoundException 
+   * @throws InterruptedException 
    * */
-  public Path generate(Path dbDir, Path segments,
-                       int numLists, long topN, long curTime, boolean filter,
-                       boolean force)
-    throws IOException {
-
-    Path tempDir =
-      new Path(getConf().get("mapred.temp.dir", ".") +
-               "/generate-temp-"+ System.currentTimeMillis());
-
-    Path segment = new Path(segments, generateSegmentName());
-    Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
-    
-    Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
-    FileSystem fs = FileSystem.get(getConf());
-    LockUtil.createLockFile(fs, lock, force);
-
-    LOG.info("Generator: Selecting best-scoring urls due for fetch.");
-    LOG.info("Generator: starting");
-    LOG.info("Generator: segment: " + segment);
-    LOG.info("Generator: filtering: " + filter);
+  public void generate(String table, long topN, long curTime, boolean filter)
+  throws Exception {    
+ 
+    LOG.info("GeneratorHbase: Selecting best-scoring urls due for fetch.");
+    LOG.info("GeneratorHbase: starting");
+    LOG.info("GeneratorHbase: filtering: " + filter);
     if (topN != Long.MAX_VALUE) {
-      LOG.info("Generator: topN: " + topN);
+      LOG.info("GeneratorHbase: topN: " + topN);
     }
-
+ 
     // map to inverted subset due for fetch, sort by score
-    JobConf job = new NutchJob(getConf());
-    job.setJobName("generate: select " + segment);
+    getConf().setLong(CRAWL_GEN_CUR_TIME, curTime);
+    getConf().setLong(CRAWL_TOP_N, topN);
+    getConf().setBoolean(CRAWL_GENERATE_FILTER, filter);
+    getConf().setInt(CRAWL_RANDOM_SEED, new Random().nextInt());
+
+    Job job = new NutchJob(getConf(), "generate-hbase: " + table);
+    job.setJobName("generate-hbase: " + table);
+    Scan scan = TableUtil.createScanFromColumns(COLUMNS);
+    TableMapReduceUtil.initTableMapperJob(table, scan,
+        GeneratorMapper.class, SelectorEntry.class,
+       WebTableRow.class, job);
+    TableMapReduceUtil.initTableReducerJob(table, GeneratorReducer.class, job, PartitionSelectorByHost.class);
 
-    if (numLists == -1) {                         // for politeness make
-      numLists = job.getNumMapTasks();            // a partition per fetch task
-    }
-    if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
-      // override
-      LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
-      numLists = 1;
-    }
-    job.setLong(CRAWL_GEN_CUR_TIME, curTime);
-    // record real generation time
-    long generateTime = System.currentTimeMillis();
-    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
-    job.setLong(CRAWL_TOP_N, topN);
-    job.setBoolean(CRAWL_GENERATE_FILTER, filter);
-
-    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
-    job.setInputFormat(SequenceFileInputFormat.class);
-
-    job.setMapperClass(Selector.class);
-    job.setPartitionerClass(Selector.class);
-    job.setReducerClass(Selector.class);
-
-    FileOutputFormat.setOutputPath(job, tempDir);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(FloatWritable.class);
-    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
-    job.setOutputValueClass(SelectorEntry.class);
-    try {
-      JobClient.runJob(job);
-    } catch (IOException e) {
-      LockUtil.removeLockFile(fs, lock);
-      throw e;
-    }
+    job.waitForCompletion(true);
     
-    // check that we selected at least some entries ...
-    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir);
-    boolean empty = true;
-    if (readers != null && readers.length > 0) {
-      for (int num = 0; num < readers.length; num++) {
-        if (readers[num].next(new FloatWritable())) {
-          empty = false;
-          break;
-        }
-      }
-    }
-    
-    for (int i = 0; i < readers.length; i++) readers[i].close();
-    
-    if (empty) {
-      LOG.warn("Generator: 0 records selected for fetching, exiting ...");
-      LockUtil.removeLockFile(fs, lock);
-      fs.delete(tempDir, true);
-      return null;
-    }
-
-    // invert again, paritition by host, sort by url hash
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Generator: Partitioning selected urls by host, for politeness.");
-    }
-    job = new NutchJob(getConf());
-    job.setJobName("generate: partition " + segment);
-    
-    job.setInt("partition.url.by.host.seed", new Random().nextInt());
-
-    FileInputFormat.addInputPath(job, tempDir);
-    job.setInputFormat(SequenceFileInputFormat.class);
-
-    job.setMapperClass(SelectorInverseMapper.class);
-    job.setMapOutputKeyClass(Text.class);
-    job.setMapOutputValueClass(SelectorEntry.class);
-    job.setPartitionerClass(PartitionUrlByHost.class);
-    job.setReducerClass(PartitionReducer.class);
-    job.setNumReduceTasks(numLists);
-
-    FileOutputFormat.setOutputPath(job, output);
-    job.setOutputFormat(SequenceFileOutputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(CrawlDatum.class);
-    job.setOutputKeyComparatorClass(HashComparator.class);
-    try {
-      JobClient.runJob(job);
-    } catch (IOException e) {
-      LockUtil.removeLockFile(fs, lock);
-      fs.delete(tempDir, true);
-      throw e;
-    }
-    if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
-      // update the db from tempDir
-      Path tempDir2 =
-        new Path(getConf().get("mapred.temp.dir", ".") +
-                 "/generate-temp-"+ System.currentTimeMillis());
-  
-      job = new NutchJob(getConf());
-      job.setJobName("generate: updatedb " + dbDir);
-      job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
-      FileInputFormat.addInputPath(job, tempDir);
-      FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
-      job.setInputFormat(SequenceFileInputFormat.class);
-      job.setMapperClass(CrawlDbUpdater.class);
-      job.setReducerClass(CrawlDbUpdater.class);
-      job.setOutputFormat(MapFileOutputFormat.class);
-      job.setOutputKeyClass(Text.class);
-      job.setOutputValueClass(CrawlDatum.class);
-      FileOutputFormat.setOutputPath(job, tempDir2);
-      try {
-        JobClient.runJob(job);
-        CrawlDb.install(job, dbDir);
-      } catch (IOException e) {
-        LockUtil.removeLockFile(fs, lock);
-        fs.delete(tempDir, true);
-        fs.delete(tempDir2, true);
-        throw e;
-      }
-      fs.delete(tempDir2, true);
-    }
-    LockUtil.removeLockFile(fs, lock);
-    fs.delete(tempDir, true);
-
-    if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); }
-
-    return segment;
+    LOG.info("GeneratorHbase: done");
   }
-  
-  private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
 
-  public static synchronized String generateSegmentName() {
-    try {
-      Thread.sleep(1000);
-    } catch (Throwable t) {};
-    return sdf.format
-      (new Date(System.currentTimeMillis()));
-  }
-
-  /**
-   * Generate a fetchlist from the crawldb.
-   */
-  public static void main(String args[]) throws Exception {
-    int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
-    System.exit(res);
-  }
-  
   public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      System.out.println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
+    if (args.length < 1) {
+      System.out.println("Usage: GeneratorHbase <webtable> [-topN N] [-noFilter]");
       return -1;
     }
-
-    Path dbDir = new Path(args[0]);
-    Path segmentsDir = new Path(args[1]);
+    
+    String table = args[0];
     long curTime = System.currentTimeMillis();
     long topN = Long.MAX_VALUE;
-    int numFetchers = -1;
     boolean filter = true;
-    boolean force = false;
 
-    for (int i = 2; i < args.length; i++) {
+    for (int i = 1; i < args.length; i++) {
       if ("-topN".equals(args[i])) {
-        topN = Long.parseLong(args[i+1]);
-        i++;
-      } else if ("-numFetchers".equals(args[i])) {
-        numFetchers = Integer.parseInt(args[i+1]);
-        i++;
-      } else if ("-adddays".equals(args[i])) {
-        long numDays = Integer.parseInt(args[i+1]);
-        curTime += numDays * 1000L * 60 * 60 * 24;
+        topN = Long.parseLong(args[++i]);
       } else if ("-noFilter".equals(args[i])) {
         filter = false;
-      } else if ("-force".equals(args[i])) {
-        force = true;
       }
-      
     }
-
+    
     try {
-      Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, force);
-      if (seg == null) return -2;
-      else return 0;
+      generate(table, topN, curTime, filter);
+      return 0;
     } catch (Exception e) {
-      LOG.fatal("Generator: " + StringUtils.stringifyException(e));
+      LOG.fatal("GeneratorHbase: " + StringUtils.stringifyException(e));
       return -1;
     }
   }
+
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new Generator(), args);
+    System.exit(res);
+  }
 }

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorMapper.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,78 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class GeneratorMapper 
+extends TableMapper<SelectorEntry, WebTableRow> {
+
+  private URLFilters filters;
+  private URLNormalizers normalizers;
+  private boolean filter;
+  private FetchSchedule schedule;
+  private ScoringFilters scoringFilters;
+  private long curTime;
+
+  @Override
+  public void map(ImmutableBytesWritable key, Result result,
+      Context context) throws IOException, InterruptedException {
+    String reversedUrl = Bytes.toString(key.get());
+    String url = TableUtil.unreverseUrl(reversedUrl);
+
+    WebTableRow row = new WebTableRow(result);
+
+    // If filtering is on don't generate URLs that don't pass URLFilters
+    try {
+      url = normalizers.normalize(url, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+      if (filter && filters.filter(url) == null)
+        return;
+    } catch (URLFilterException e) {
+      Generator.LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
+      return;
+    }
+
+    // check fetch schedule
+    if (!schedule.shouldFetch(url, row, curTime)) {
+      if (Generator.LOG.isDebugEnabled()) {
+        Generator.LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" + 
+            row.getFetchTime() + ", curTime=" + curTime);
+      }
+      return;
+    }
+    
+    float score = row.getScore();
+    try {
+      score = scoringFilters.generatorSortValue(url, row, score);
+    } catch (ScoringFilterException e) { 
+      // ignore
+    }
+    SelectorEntry entry = new SelectorEntry(url, score);
+    context.write(entry, row);
+
+  }
+
+  @Override
+  public void setup(Context context) {
+    Configuration conf = context.getConfiguration();
+    filters = new URLFilters(conf);
+    curTime = conf.getLong(Generator.CRAWL_GEN_CUR_TIME,
+        System.currentTimeMillis());
+    normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
+    filter = conf.getBoolean(Generator.CRAWL_GENERATE_FILTER, true);
+    schedule = FetchScheduleFactory.getFetchSchedule(conf);
+    scoringFilters = new ScoringFilters(conf);
+  }
+}

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/GeneratorReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,68 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+/** Reduce class for generate
+ * 
+ * The #reduce() method write a random integer to all generated URLs. This random
+ * number is then used by {@link FetcherMapper}.
+ *
+ */
+public class GeneratorReducer
+extends TableReducer<SelectorEntry, WebTableRow, SelectorEntry> {
+
+  private long limit;
+  private long maxPerHost;
+  private long count = 0;
+  private Map<String, Integer> hostCountMap = new HashMap<String, Integer>();
+  private Random random = new Random();
+
+  @Override
+  protected void reduce(SelectorEntry key, Iterable<WebTableRow> values,
+      Context context) throws IOException, InterruptedException {
+    for (WebTableRow row : values) {
+      if (maxPerHost > 0) {
+        String host = key.host;
+        Integer hostCount = hostCountMap.get(host);
+        if (hostCount == null) {
+          hostCountMap.put(host, 0);
+          hostCount = 0;
+        }
+        if (hostCount > maxPerHost) {
+          return;
+        }
+        hostCountMap.put(host, hostCount + 1);
+      }
+      if (count >= limit) {
+        return;
+      }
+      
+      row.putMeta(Generator.GENERATOR_MARK, Bytes.toBytes(random.nextInt()));
+      row.makeRowMutation().writeToContext(key, context);
+      count++;
+    }
+  }
+
+  @Override
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    long totalLimit = conf.getLong(Generator.CRAWL_TOP_N, Long.MAX_VALUE);
+    if (totalLimit == Long.MAX_VALUE) {
+      limit = Long.MAX_VALUE; 
+    } else {
+      limit = totalLimit / context.getNumReduceTasks();
+    }
+    maxPerHost = conf.getLong(Generator.GENERATE_MAX_PER_HOST, -1);
+  }
+  
+}
\ No newline at end of file

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Injector.java Sun Aug 16 22:25:12 2009
@@ -1,198 +1,223 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.nutch.crawl;
 
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
-// Commons Logging imports
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-
-import org.apache.nutch.net.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LogUtil;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
 
-/** This class takes a flat file of URLs and adds them to the of pages to be
- * crawled.  Useful for bootstrapping the system. */
-public class Injector extends Configured implements Tool {
-  public static final Log LOG = LogFactory.getLog(Injector.class);
+public class Injector
+implements Tool {
 
-
-  /** Normalize and filter injected urls. */
-  public static class InjectMapper implements Mapper<WritableComparable, Text, Text, CrawlDatum> {
+  public static final Log LOG = LogFactory.getLog(Injector.class);
+  
+  private static final String INJECT_KEY_STR = "__injkey__";
+  public static final byte[] INJECT_KEY =
+    Bytes.toBytes(INJECT_KEY_STR);
+  
+  private static final Set<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+  
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA, INJECT_KEY));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.STATUS));
+  }
+  
+  private Configuration conf;
+  
+  public static class UrlMapper
+  extends Mapper<LongWritable, Text, Text, Text> {
     private URLNormalizers urlNormalizers;
-    private int interval;
-    private float scoreInjected;
-    private JobConf jobConf;
     private URLFilters filters;
-    private ScoringFilters scfilters;
-    private long curTime;
+    private HTable table;
+    private HBaseConfiguration hbaseConf;
 
-    public void configure(JobConf job) {
-      this.jobConf = job;
-      urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
-      interval = jobConf.getInt("db.fetch.interval.default", 2592000);
-      filters = new URLFilters(jobConf);
-      scfilters = new ScoringFilters(jobConf);
-      scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
-      curTime = job.getLong("injector.current.time", System.currentTimeMillis());
-    }
-
-    public void close() {}
-
-    public void map(WritableComparable key, Text value,
-                    OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-      throws IOException {
-      String url = value.toString();              // value is line of text
+    @Override
+    public void map(LongWritable key, Text value, Context context) 
+    throws IOException {
+      if (table == null) {
+        throw new IOException("Can not connect to hbase table");
+      }
+      String url = value.toString();
+      String reversedUrl;
       try {
         url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
-        url = filters.filter(url);             // filter the url
-      } catch (Exception e) {
-        if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
-        url = null;
-      }
-      if (url != null) {                          // if it passes
-        value.set(url);                           // collect it
-        CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);
-        datum.setFetchTime(curTime);
-        datum.setScore(scoreInjected);
-        try {
-          scfilters.injectedScore(value, datum);
-        } catch (ScoringFilterException e) {
-          if (LOG.isWarnEnabled()) {
-            LOG.warn("Cannot filter injected score for url " + url +
-                     ", using default (" + e.getMessage() + ")");
-          }
-          datum.setScore(scoreInjected);
+        url = filters.filter(url);
+        if (url == null) {
+          return;
         }
-        output.collect(value, datum);
+        reversedUrl = TableUtil.reverseUrl(url);
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e);
+        return;
       }
-    }
-  }
 
-  /** Combine multiple new entries for a url. */
-  public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
-    public void configure(JobConf job) {}    
-    public void close() {}
+      Put put = new Put(Bytes.toBytes(reversedUrl));
+      put.add(WebTableColumns.METADATA, INJECT_KEY, TableUtil.YES_VAL);
 
-    private CrawlDatum old = new CrawlDatum();
-    private CrawlDatum injected = new CrawlDatum();
-    
-    public void reduce(Text key, Iterator<CrawlDatum> values,
-                       OutputCollector<Text, CrawlDatum> output, Reporter reporter)
-      throws IOException {
-      boolean oldSet = false;
-      while (values.hasNext()) {
-        CrawlDatum val = values.next();
-        if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
-          injected.set(val);
-          injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
-        } else {
-          old.set(val);
-          oldSet = true;
-        }
+      table.put(put);
+    }
+
+    @Override
+    public void setup(Context context) {  
+      Configuration conf = context.getConfiguration();
+      urlNormalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_INJECT);
+      filters = new URLFilters(conf);
+      hbaseConf = new HBaseConfiguration();
+      try {
+        table = new HTable(hbaseConf, conf.get("input.table") );
+      } catch (IOException e) {
+        e.printStackTrace(LogUtil.getFatalStream(LOG));
       }
-      CrawlDatum res = null;
-      if (oldSet) res = old; // don't overwrite existing value
-      else res = injected;
 
-      output.collect(key, res);
     }
-  }
+    
+    @Override
+    public void cleanup(Context context) throws IOException {
+      table.close();
+    }
 
-  public Injector() {}
-  
-  public Injector(Configuration conf) {
-    setConf(conf);
   }
   
-  public void inject(Path crawlDb, Path urlDir) throws IOException {
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Injector: starting");
-      LOG.info("Injector: crawlDb: " + crawlDb);
-      LOG.info("Injector: urlDir: " + urlDir);
+  public static class InjectorMapper
+  extends TableMapper<Text, Text> {
+    private HTable table;
+    private float scoreInjected;
+    private FetchSchedule schedule;
+    private ScoringFilters scoringFilters;
+    
+    @Override
+    public void setup(Context context) throws IOException {
+      Configuration conf = context.getConfiguration();
+      schedule = FetchScheduleFactory.getFetchSchedule(conf);
+      table = new HTable(conf.get(TableInputFormat.INPUT_TABLE));
+      scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+      scoringFilters = new ScoringFilters(conf);
     }
-
-    Path tempDir =
-      new Path(getConf().get("mapred.temp.dir", ".") +
-               "/inject-temp-"+
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
-    // map text input file to a <url,CrawlDatum> file
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Injector: Converting injected urls to crawl db entries.");
+    
+    @Override
+    public void map(ImmutableBytesWritable key, Result result, Context context)
+    throws IOException {
+      WebTableRow row = new WebTableRow(result);
+      row.deleteMeta(INJECT_KEY);
+      if (!row.hasColumn(WebTableColumns.STATUS, null)) {
+        String url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+        // this is a new column so add necessary fields
+        row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+        schedule.initializeSchedule(url, row);
+        try {
+          scoringFilters.injectedScore(url, row);
+        } catch (ScoringFilterException e) {
+          row.setScore(scoreInjected);
+        }
+      }
+      row.makeRowMutation().commit(table);
     }
-    JobConf sortJob = new NutchJob(getConf());
-    sortJob.setJobName("inject " + urlDir);
-    FileInputFormat.addInputPath(sortJob, urlDir);
-    sortJob.setMapperClass(InjectMapper.class);
-
-    FileOutputFormat.setOutputPath(sortJob, tempDir);
-    sortJob.setOutputFormat(SequenceFileOutputFormat.class);
-    sortJob.setOutputKeyClass(Text.class);
-    sortJob.setOutputValueClass(CrawlDatum.class);
-    sortJob.setLong("injector.current.time", System.currentTimeMillis());
-    JobClient.runJob(sortJob);
-
-    // merge with existing crawl db
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Injector: Merging injected urls into crawl db.");
+    
+    @Override
+    public void cleanup(Context context) throws IOException {
+      table.close();
     }
-    JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
-    FileInputFormat.addInputPath(mergeJob, tempDir);
-    mergeJob.setReducerClass(InjectReducer.class);
-    JobClient.runJob(mergeJob);
-    CrawlDb.install(mergeJob, crawlDb);
+  }
 
-    // clean up
-    FileSystem fs = FileSystem.get(getConf());
-    fs.delete(tempDir, true);
-    if (LOG.isInfoEnabled()) { LOG.info("Injector: done"); }
+  public boolean inject(String table, Path urlDir)
+  throws IOException, InterruptedException, ClassNotFoundException {
+
+    LOG.info("Injector: starting");
+    LOG.info("Injector: urlDir: " + urlDir);
+
+    getConf().setLong("injector.current.time", System.currentTimeMillis());
+    getConf().set("input.table", table);
+    Job job = new NutchJob(getConf(), "inject-hbase-p1 " + urlDir);
+    FileInputFormat.addInputPath(job, urlDir);
+    job.setMapperClass(UrlMapper.class);
+    TableMapReduceUtil.initTableReducerJob(table,
+        IdentityTableReducer.class, job);
+    job.setOutputFormatClass(NullOutputFormat.class);
+
+    if (!job.waitForCompletion(true)) {
+      LOG.warn("Injecting new users failed!");
+      return false;
+    }
+    job = new NutchJob(getConf(), "inject-hbase-p2 " + urlDir);
+
+    Scan scan = TableUtil.createScanFromColumns(COLUMNS);
+    scan.setFilter(new ValueFilter(WebTableColumns.METADATA, INJECT_KEY, ValueFilter.CompareOp.EQUAL, TableUtil.YES_VAL, true));
+    TableMapReduceUtil.initTableMapperJob(table,
+        scan, InjectorMapper.class, Text.class, Text.class, job);
+    TableMapReduceUtil.initTableReducerJob(table,
+        IdentityTableReducer.class, job);
+    if (job.waitForCompletion(true)) {
+      LOG.info("Injector: done");
+      return true;
+    }
+    return false;
+  }
+  
 
+  @Override
+  public Configuration getConf() {
+    return conf;
   }
 
-  public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
-    System.exit(res);
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
   }
-  
+
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: Injector <crawldb> <url_dir>");
+      System.err.println("Usage: Injector <webtable> <url_dir>");
       return -1;
     }
     try {
-      inject(new Path(args[0]), new Path(args[1]));
-      return 0;
+      if (inject(args[0], new Path(args[1]))) {
+        return 0;
+      }
+      return -1;
     } catch (Exception e) {
       LOG.fatal("Injector: " + StringUtils.stringifyException(e));
       return -1;
     }
   }
 
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(),
+        new Injector(), args);
+    System.exit(res);
+  }
 }

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/MD5Signature.java Sun Aug 16 22:25:12 2009
@@ -17,9 +17,14 @@
 
 package org.apache.nutch.crawl;
 
+import java.util.Collection;
+import java.util.HashSet;
+
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.nutch.parse.Parse;
-import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.WebTableRow;
 
 /**
  * Default implementation of a page signature. It calculates an MD5 hash
@@ -30,10 +35,20 @@
  */
 public class MD5Signature extends Signature {
 
-  public byte[] calculate(Content content, Parse parse) {
-    byte[] data = content.getContent();
-    if (data == null) data = content.getUrl().getBytes();
-    StringBuilder buf = new StringBuilder().append(data).append(parse.getText());
-    return MD5Hash.digest(buf.toString().getBytes()).getDigest();
+  private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+  
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+  }
+  
+  @Override
+  public byte[] calculate(WebTableRow row, Parse parse) {
+    byte[] data = row.getContent();
+    return MD5Hash.digest(data).getDigest();
+  }
+
+  @Override
+  public Collection<HbaseColumn> getColumns() {
+    return COLUMNS;
   }
 }

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/NutchWritable.java Sun Aug 16 22:25:12 2009
@@ -41,13 +41,15 @@
       org.apache.nutch.parse.Outlink.class,
       org.apache.nutch.parse.ParseText.class,
       org.apache.nutch.parse.ParseData.class,
-      org.apache.nutch.parse.ParseImpl.class,
       org.apache.nutch.parse.ParseStatus.class,
       org.apache.nutch.protocol.Content.class,
       org.apache.nutch.protocol.ProtocolStatus.class,
       org.apache.nutch.searcher.Hit.class,
       org.apache.nutch.searcher.HitDetails.class,
-      org.apache.nutch.searcher.Hits.class
+      org.apache.nutch.searcher.Hits.class,
+      org.apache.nutch.scoring.ScoreDatum.class,
+      org.apache.nutch.util.hbase.WebTableRow.class,
+      org.apache.hadoop.hbase.client.Result.class
     };
   }
 

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/PartitionSelectorByHost.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,16 @@
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.nutch.crawl.Generator.SelectorEntry;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+public class PartitionSelectorByHost extends Partitioner<SelectorEntry, WebTableRow> {
+
+  @Override
+  public int getPartition(SelectorEntry key, WebTableRow value,
+      int numPartitions) {
+    int hashCode = key.host.hashCode();
+
+    return (hashCode & Integer.MAX_VALUE) % numPartitions;  
+  }
+}

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/Signature.java Sun Aug 16 22:25:12 2009
@@ -17,21 +17,16 @@
 
 package org.apache.nutch.crawl;
 
+import java.util.Collection;
+
 import org.apache.nutch.parse.Parse;
-import org.apache.nutch.protocol.Content;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.hadoop.conf.Configured;
 
-public abstract class Signature implements Configurable {
-  protected Configuration conf;
+public abstract class Signature extends Configured {
   
-  public abstract byte[] calculate(Content content, Parse parse);
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
+  public abstract byte[] calculate(WebTableRow row, Parse parse);
+  
+  public abstract Collection<HbaseColumn> getColumns();
 }

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureComparator.java Sun Aug 16 22:25:12 2009
@@ -17,23 +17,13 @@
 
 package org.apache.nutch.crawl;
 
-import java.util.Comparator;
-
-public class SignatureComparator implements Comparator {
-  public int compare(Object o1, Object o2) {
-    return _compare(o1, o2);
-  }
+public class SignatureComparator {
+  public static int compare(byte[] data1, byte[] data2) {
+    if (data1 == null && data2 == null) return 0;
+    if (data1 == null) return -1;
+    if (data2 == null) return 1;
+    return _compare(data1, 0, data1.length, data2, 0, data2.length);  }
   
-  public static int _compare(Object o1, Object o2) {
-    if (o1 == null && o2 == null) return 0;
-    if (o1 == null) return -1;
-    if (o2 == null) return 1;
-    if (!(o1 instanceof byte[])) return -1;
-    if (!(o2 instanceof byte[])) return 1;
-    byte[] data1 = (byte[])o1;
-    byte[] data2 = (byte[])o2;
-    return _compare(data1, 0, data1.length, data2, 0, data2.length);
-  }
   
   public static int _compare(byte[] data1, int s1, int l1, byte[] data2, int s2, int l2) {
     if (l2 > l1) return -1;

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/SignatureFactory.java Sun Aug 16 22:25:12 2009
@@ -18,12 +18,15 @@
 package org.apache.nutch.crawl;
 
 // Commons Logging imports
+import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 // Hadoop imports
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nutch.util.ObjectCache;
+import org.apache.nutch.util.hbase.HbaseColumn;
 
 /**
  * Factory class, which instantiates a Signature implementation according to the
@@ -44,10 +47,8 @@
     Signature impl = (Signature)objectCache.getObject(clazz);
     if (impl == null) {
       try {
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Using Signature impl: " + clazz);
-        }
-        Class implClass = Class.forName(clazz);
+        LOG.info("Using Signature impl: " + clazz);
+        Class<?> implClass = Class.forName(clazz);
         impl = (Signature)implClass.newInstance();
         impl.setConf(conf);
         objectCache.setObject(clazz, impl);
@@ -57,4 +58,9 @@
     }
     return impl;
   }
+  
+  public static Collection<HbaseColumn> getColumns(Configuration conf) {
+    Signature impl = getSignature(conf);
+    return impl.getColumns();
+  }
 }

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateMapper.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,67 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.scoring.ScoreDatum;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class TableUpdateMapper
+extends TableMapper<ImmutableBytesWritable, NutchWritable> {
+  public static final Log LOG = TableUpdater.LOG;
+
+  private ScoringFilters scoringFilters;
+  
+  private List<ScoreDatum> scoreData = new ArrayList<ScoreDatum>();
+
+  @Override
+  public void map(ImmutableBytesWritable key, Result result, Context context)
+  throws IOException, InterruptedException {
+
+    String url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+    WebTableRow row = new WebTableRow(result);
+
+    Collection<Outlink> outlinks = row.getOutlinks();
+
+    scoreData.clear();
+    for (Outlink outlink : outlinks) {
+      scoreData.add(new ScoreDatum(0.0f, outlink.getToUrl(), outlink.getAnchor()));
+    }
+
+    // TODO: Outlink filtering (i.e. "only keep the first n outlinks")
+    try {
+      scoringFilters.distributeScoreToOutlinks(url, row, scoreData, outlinks.size());
+    } catch (ScoringFilterException e) {
+      LOG.warn("Distributing score failed for URL: " + key +
+          " exception:" + StringUtils.stringifyException(e));
+    }
+
+    context.write(key, new NutchWritable(row));
+
+    for (ScoreDatum scoreDatum : scoreData) {
+      String reversedOut = TableUtil.reverseUrl(scoreDatum.getUrl());
+      ImmutableBytesWritable outKey =
+        new ImmutableBytesWritable(reversedOut.getBytes());
+      scoreDatum.setUrl(url);
+      context.write(outKey, new NutchWritable(scoreDatum));
+    }
+  }
+
+  @Override
+  public void setup(Context context) {
+    scoringFilters = new ScoringFilters(context.getConfiguration());
+  }
+
+}

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdateReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,163 @@
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.FetchSchedule;
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureComparator;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.TableParser;
+import org.apache.nutch.scoring.ScoreDatum;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+public class TableUpdateReducer
+extends TableReducer<ImmutableBytesWritable, NutchWritable, ImmutableBytesWritable> {
+  
+  public static final Log LOG = TableUpdater.LOG;
+
+  private int retryMax;
+  private boolean additionsAllowed;
+  private int maxInterval;
+  private FetchSchedule schedule;
+  private ScoringFilters scoringFilters;
+  private List<ScoreDatum> inlinkedScoreData = new ArrayList<ScoreDatum>();
+  
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    retryMax = conf.getInt("db.fetch.retry.max", 3);
+    additionsAllowed = conf.getBoolean(CrawlDb.CRAWLDB_ADDITIONS_ALLOWED, true);
+    maxInterval = conf.getInt("db.fetch.interval.max", 0 );
+    schedule = FetchScheduleFactory.getFetchSchedule(conf);
+    scoringFilters = new ScoringFilters(conf);
+  }
+  
+  @Override
+  protected void reduce(ImmutableBytesWritable key, Iterable<NutchWritable> values,
+      Context context) throws IOException, InterruptedException {
+
+    WebTableRow row = null;
+    inlinkedScoreData.clear();
+
+    for (NutchWritable nutchWritable : values) {
+      Writable val = nutchWritable.get();
+      if (val instanceof WebTableRow) {
+        row = (WebTableRow) val;
+      } else {
+        inlinkedScoreData.add((ScoreDatum) val);
+      }
+    }
+    String url;
+    try {
+      url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+    } catch (Exception e) {
+      // this can happen because a newly discovered malformed link
+      // may slip by url filters
+      // TODO: Find a better solution
+      return;
+    }
+
+    if (row == null) { // new row
+      if (!additionsAllowed) {
+        return;
+      }
+      row = new WebTableRow(key.get());
+      schedule.initializeSchedule(url, row);
+      row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+      try {
+        scoringFilters.initialScore(url, row);
+      } catch (ScoringFilterException e) {
+        row.setScore(0.0f);
+      }
+    } else {
+      if (row.hasMeta(Fetcher.REDIRECT_DISCOVERED) && !row.hasColumn(WebTableColumns.STATUS, null)) {
+        // this row is marked during fetch as the destination of a redirect
+        // but does not contain anything else, so we initialize it.
+        schedule.initializeSchedule(url, row);
+        row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+        try {
+          scoringFilters.initialScore(url, row);
+        } catch (ScoringFilterException e) {
+          row.setScore(0.0f);
+        }
+      } else { // update row
+        byte status = row.getStatus();
+        switch (status) {
+        case CrawlDatumHbase.STATUS_FETCHED:         // succesful fetch
+        case CrawlDatumHbase.STATUS_REDIR_TEMP:      // successful fetch, redirected
+        case CrawlDatumHbase.STATUS_REDIR_PERM:
+        case CrawlDatumHbase.STATUS_NOTMODIFIED:     // successful fetch, notmodified
+          int modified = FetchSchedule.STATUS_UNKNOWN;
+          if (status == CrawlDatumHbase.STATUS_NOTMODIFIED) {
+            modified = FetchSchedule.STATUS_NOTMODIFIED;
+          }
+          byte[] prevSig = row.getPrevSignature();
+          byte[] signature = row.getSignature();
+          if (prevSig != null && signature != null) {
+            if (SignatureComparator.compare(prevSig, signature) != 0) {
+              modified = FetchSchedule.STATUS_MODIFIED;
+            } else {
+              modified = FetchSchedule.STATUS_NOTMODIFIED;
+            }
+          }
+          long fetchTime = row.getFetchTime();
+          long prevFetchTime = row.getPrevFetchTime();
+          long modifiedTime = row.getModifiedTime();
+
+          schedule.setFetchSchedule(url, row, prevFetchTime, 0L,
+              fetchTime, modifiedTime, modified);
+          if (maxInterval < row.getFetchInterval())
+            schedule.forceRefetch(url, row, false);
+          break;
+        case CrawlDatumHbase.STATUS_RETRY:
+          schedule.setPageRetrySchedule(url, row, 0L, 0L, row.getFetchTime());
+          if (row.getRetriesSinceFetch() < retryMax) {
+            row.setStatus(CrawlDatumHbase.STATUS_UNFETCHED);
+          } else {
+            row.setStatus(CrawlDatumHbase.STATUS_GONE);
+          }
+          break;
+        case CrawlDatumHbase.STATUS_GONE:
+          schedule.setPageGoneSchedule(url, row, 0L, 0L, row.getFetchTime());
+          break;
+        }
+      }
+    }
+
+    row.deleteAllInlinks();
+    for (ScoreDatum inlink : inlinkedScoreData) {
+      row.addInlink(new Inlink(inlink.getUrl(), inlink.getAnchor()));
+    }
+    
+    try {
+      scoringFilters.updateScore(url, row, inlinkedScoreData);
+    } catch (ScoringFilterException e) {
+      LOG.warn("Scoring filters failed with exception " +
+                StringUtils.stringifyException(e));
+    }
+
+    // clear markers
+    row.deleteMeta(Fetcher.REDIRECT_DISCOVERED);
+    row.deleteMeta(Generator.GENERATOR_MARK);
+    row.deleteMeta(Fetcher.FETCH_MARK);
+    row.deleteMeta(TableParser.PARSE_MARK);
+
+    row.makeRowMutation(key.get()).writeToContext(key, context);
+  }
+
+}

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TableUpdater.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,81 @@
+package org.apache.nutch.crawl;
+
+import java.util.Collection;
+import java.util.HashSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.TableParser;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class TableUpdater extends Configured
+implements Tool {
+
+  public static final Log LOG = LogFactory.getLog(TableUpdater.class);
+
+
+  private static final Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.OUTLINKS));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.INLINKS));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.STATUS));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA,  TableParser.PARSE_MARK));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA,  Fetcher.REDIRECT_DISCOVERED));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.RETRIES));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_TIME));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.MODIFIED_TIME));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.FETCH_INTERVAL));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.PREV_FETCH_TIME));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.PREV_SIGNATURE));
+  }
+
+  private void updateTable(String table) throws Exception {
+    LOG.info("TableUpdater: starting");
+    LOG.info("TableUpdater: table: " + table);
+    Job job = new NutchJob(getConf(), "update-table " + table);
+    //job.setBoolean(ALL, updateAll);
+    job.setJobName("update-table " + table);
+    ScoringFilters scoringFilters = new ScoringFilters(getConf());
+    HashSet<HbaseColumn> columns = new HashSet<HbaseColumn>(COLUMNS);
+    columns.addAll(scoringFilters.getColumns());
+    TableMapReduceUtil.initTableMapperJob(table, TableUtil.createScanFromColumns(columns), 
+        TableUpdateMapper.class, ImmutableBytesWritable.class, 
+        NutchWritable.class, job);
+    TableMapReduceUtil.initTableReducerJob(table, TableUpdateReducer.class, job);
+
+    job.waitForCompletion(true);
+    LOG.info("TableUpdater: done");
+  }
+
+  public int run(String[] args) throws Exception {
+    String usage = "Usage: TableUpdater <webtable>";
+
+    if (args.length < 1) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+
+    updateTable(args[0]);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(NutchConfiguration.create(), new TableUpdater(), args);
+    System.exit(res);
+  }
+
+}

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/crawl/TextProfileSignature.java Sun Aug 16 22:25:12 2009
@@ -17,22 +17,20 @@
 
 package org.apache.nutch.crawl;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStreamReader;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 
 import org.apache.hadoop.io.MD5Hash;
+
 import org.apache.nutch.parse.Parse;
-import org.apache.nutch.parse.ParseImpl;
-import org.apache.nutch.protocol.Content;
-import org.apache.nutch.util.StringUtil;
-import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.WebTableRow;
 
 /**
  * <p>An implementation of a page signature. It calculates an MD5 hash
@@ -60,16 +58,22 @@
  * @author Andrzej Bialecki &lt;ab@getopt.org&gt;
  */
 public class TextProfileSignature extends Signature {
+
+  private final static Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+  
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.CONTENT));
+  }
   
   Signature fallback = new MD5Signature();
 
-  public byte[] calculate(Content content, Parse parse) {
+  public byte[] calculate(WebTableRow row, Parse parse) {
     int MIN_TOKEN_LEN = getConf().getInt("db.signature.text_profile.min_token_len", 2);
     float QUANT_RATE = getConf().getFloat("db.signature.text_profile.quant_rate", 0.01f);
     HashMap<String, Token> tokens = new HashMap<String, Token>();
     String text = null;
     if (parse != null) text = parse.getText();
-    if (text == null || text.length() == 0) return fallback.calculate(content, parse);
+    if (text == null || text.length() == 0) return fallback.calculate(row, parse);
     StringBuffer curToken = new StringBuffer();
     int maxFreq = 0;
     for (int i = 0; i < text.length(); i++) {
@@ -133,6 +137,10 @@
     }
     return MD5Hash.digest(newText.toString()).getDigest();
   }
+
+  public Collection<HbaseColumn> getColumns() {
+    return COLUMNS;
+  }
   
   private static class Token {
     public int cnt;
@@ -153,30 +161,4 @@
       return t2.cnt - t1.cnt;
     }
   }
-  
-  public static void main(String[] args) throws Exception {
-    TextProfileSignature sig = new TextProfileSignature();
-    sig.setConf(NutchConfiguration.create());
-    HashMap<String, byte[]> res = new HashMap<String, byte[]>();
-    File[] files = new File(args[0]).listFiles();
-    for (int i = 0; i < files.length; i++) {
-      FileInputStream fis = new FileInputStream(files[i]);
-      BufferedReader br = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
-      StringBuffer text = new StringBuffer();
-      String line = null;
-      while ((line = br.readLine()) != null) {
-        if (text.length() > 0) text.append("\n");
-        text.append(line);
-      }
-      br.close();
-      byte[] signature = sig.calculate(null, new ParseImpl(text.toString(), null));
-      res.put(files[i].toString(), signature);
-    }
-    Iterator<String> it = res.keySet().iterator();
-    while (it.hasNext()) {
-      String name = it.next();
-      byte[] signature = res.get(name);
-      System.out.println(name + "\t" + StringUtil.toHexString(signature));
-    }
-  }
 }

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetchEntry.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,49 @@
+package org.apache.nutch.fetcher;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+
+public class FetchEntry implements Writable {
+
+  private ImmutableBytesWritable key;
+  private Result row;
+  
+  public FetchEntry() {
+    key = new ImmutableBytesWritable();
+    row = new Result();
+  }
+  
+  public FetchEntry(FetchEntry fe) {
+    this.key = new ImmutableBytesWritable(fe.key.get().clone());
+  }
+  
+  public FetchEntry(ImmutableBytesWritable key, Result row) {
+    this.key = key;
+    this.row = row;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    key.readFields(in);
+    row.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    key.write(out);
+    row.write(out);
+  }
+
+  public ImmutableBytesWritable getKey() {
+    return key;
+  }
+
+  public Result getRow() {
+    return row;
+  }
+}