You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/08/17 00:25:17 UTC

svn commit: r804789 [3/6] - in /lucene/nutch/branches/nutchbase: ./ bin/ conf/ lib/ src/java/org/apache/nutch/analysis/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/ind...

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/Fetcher.java Sun Aug 16 22:25:12 2009
@@ -1,1042 +1,161 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.nutch.fetcher;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 
-// Commons Logging imports
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.ValueFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
 
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.StringUtils;
-
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.crawl.NutchWritable;
-import org.apache.nutch.crawl.SignatureFactory;
-import org.apache.nutch.metadata.Metadata;
-import org.apache.nutch.metadata.Nutch;
-import org.apache.nutch.net.*;
-import org.apache.nutch.protocol.*;
-import org.apache.nutch.parse.*;
-import org.apache.nutch.scoring.ScoringFilters;
-import org.apache.nutch.util.*;
-
-
-/** 
- * A queue-based fetcher.
- * 
- * <p>This fetcher uses a well-known model of one producer (a QueueFeeder)
- * and many consumers (FetcherThread-s).
- * 
- * <p>QueueFeeder reads input fetchlists and
- * populates a set of FetchItemQueue-s, which hold FetchItem-s that
- * describe the items to be fetched. There are as many queues as there are unique
- * hosts, but at any given time the total number of fetch items in all queues
- * is less than a fixed number (currently set to a multiple of the number of
- * threads).
- * 
- * <p>As items are consumed from the queues, the QueueFeeder continues to add new
- * input items, so that their total count stays fixed (FetcherThread-s may also
- * add new items to the queues e.g. as a results of redirection) - until all
- * input items are exhausted, at which point the number of items in the queues
- * begins to decrease. When this number reaches 0 fetcher will finish.
- * 
- * <p>This fetcher implementation handles per-host blocking itself, instead
- * of delegating this work to protocol-specific plugins.
- * Each per-host queue handles its own "politeness" settings, such as the
- * maximum number of concurrent requests and crawl delay between consecutive
- * requests - and also a list of requests in progress, and the time the last
- * request was finished. As FetcherThread-s ask for new items to be fetched,
- * queues may return eligible items or null if for "politeness" reasons this
- * host's queue is not yet ready.
- * 
- * <p>If there are still unfetched items in the queues, but none of the items
- * are ready, FetcherThread-s will spin-wait until either some items become
- * available, or a timeout is reached (at which point the Fetcher will abort,
- * assuming the task is hung).
- * 
- * @author Andrzej Bialecki
+/** Multi-threaded fetcher.
+ *
  */
-public class Fetcher extends Configured implements
-    MapRunnable<Text, CrawlDatum, Text, NutchWritable> { 
-
-  public static final int PERM_REFRESH_TIME = 5;
-
-  public static final String CONTENT_REDIR = "content";
-
-  public static final String PROTOCOL_REDIR = "protocol";
-
-  public static final Log LOG = LogFactory.getLog(Fetcher.class);
+public class Fetcher
+implements Tool {
   
-  public static class InputFormat extends SequenceFileInputFormat<Text, CrawlDatum> {
-    /** Don't split inputs, to keep things polite. */
-    public InputSplit[] getSplits(JobConf job, int nSplits)
-      throws IOException {
-      FileStatus[] files = listStatus(job);
-      FileSplit[] splits = new FileSplit[files.length];
-      for (int i = 0; i < files.length; i++) {
-        FileStatus cur = files[i];
-        splits[i] = new FileSplit(cur.getPath(), 0,
-            cur.getLen(), (String[])null);
-      }
-      return splits;
-    }
-  }
-
-  private OutputCollector<Text, NutchWritable> output;
-  private Reporter reporter;
+  public static final String PROTOCOL_REDIR = "protocol";
   
-  private String segmentName;
-  private AtomicInteger activeThreads = new AtomicInteger(0);
-  private AtomicInteger spinWaiting = new AtomicInteger(0);
-
-  private long start = System.currentTimeMillis(); // start time of fetcher run
-  private AtomicLong lastRequestStart = new AtomicLong(start);
+  public static final int PERM_REFRESH_TIME = 5;
 
-  private AtomicLong bytes = new AtomicLong(0);        // total bytes fetched
-  private AtomicInteger pages = new AtomicInteger(0);  // total pages fetched
-  private AtomicInteger errors = new AtomicInteger(0); // total pages errored
+  public static final byte[] REDIRECT_DISCOVERED =
+    Bytes.toBytes("___rdrdsc__");
 
-  private boolean storingContent;
-  private boolean parsing;
-  FetchItemQueues fetchQueues;
-  QueueFeeder feeder;
+  public static final byte[] FETCH_MARK =
+    Bytes.toBytes("__ftchmrk__");
   
-  /**
-   * This class described the item to be fetched.
-   */
-  private static class FetchItem {    
-    String queueID;
-    Text url;
-    URL u;
-    CrawlDatum datum;
-    
-    public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
-      this.url = url;
-      this.u = u;
-      this.datum = datum;
-      this.queueID = queueID;
-    }
-    
-    /** Create an item. Queue id will be created based on <code>byIP</code>
-     * argument, either as a protocol + hostname pair, or protocol + IP
-     * address pair.
-     */
-    public static FetchItem create(Text url, CrawlDatum datum, boolean byIP) {
-      String queueID;
-      URL u = null;
-      try {
-        u = new URL(url.toString());
-      } catch (Exception e) {
-        LOG.warn("Cannot parse url: " + url, e);
-        return null;
-      }
-      String proto = u.getProtocol().toLowerCase();
-      String host;
-      if (byIP) {
-        try {
-          InetAddress addr = InetAddress.getByName(u.getHost());
-          host = addr.getHostAddress();
-        } catch (UnknownHostException e) {
-          // unable to resolve it, so don't fall back to host name
-          LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
-          return null;
-        }
-      } else {
-        host = u.getHost();
-        if (host == null) {
-          LOG.warn("Unknown host for url: " + url + ", skipping.");
-          return null;
-        }
-        host = host.toLowerCase();
-      }
-      queueID = proto + "://" + host;
-      return new FetchItem(url, u, datum, queueID);
-    }
-
-    public CrawlDatum getDatum() {
-      return datum;
-    }
-
-    public String getQueueID() {
-      return queueID;
-    }
-
-    public Text getUrl() {
-      return url;
-    }
-    
-    public URL getURL2() {
-      return u;
-    }
-  }
+  private static final Collection<HbaseColumn> COLUMNS =
+    new HashSet<HbaseColumn>();
   
-  /**
-   * This class handles FetchItems which come from the same host ID (be it
-   * a proto/hostname or proto/IP pair). It also keeps track of requests in
-   * progress and elapsed time between requests.
-   */
-  private static class FetchItemQueue {
-    List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
-    Set<FetchItem>  inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
-    AtomicLong nextFetchTime = new AtomicLong();
-    long crawlDelay;
-    long minCrawlDelay;
-    int maxThreads;
-    Configuration conf;
-    
-    public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
-      this.conf = conf;
-      this.maxThreads = maxThreads;
-      this.crawlDelay = crawlDelay;
-      this.minCrawlDelay = minCrawlDelay;
-      // ready to start
-      setEndTime(System.currentTimeMillis() - crawlDelay);
-    }
-    
-    public int getQueueSize() {
-      return queue.size();
-    }
-    
-    public int getInProgressSize() {
-      return inProgress.size();
-    }
-    
-    public void finishFetchItem(FetchItem it, boolean asap) {
-      if (it != null) {
-        inProgress.remove(it);
-        setEndTime(System.currentTimeMillis(), asap);
-      }
-    }
-    
-    public void addFetchItem(FetchItem it) {
-      if (it == null) return;
-      queue.add(it);
-    }
-    
-    public void addInProgressFetchItem(FetchItem it) {
-      if (it == null) return;
-      inProgress.add(it);
-    }
-    
-    public FetchItem getFetchItem() {
-      if (inProgress.size() >= maxThreads) return null;
-      long now = System.currentTimeMillis();
-      if (nextFetchTime.get() > now) return null;
-      FetchItem it = null;
-      if (queue.size() == 0) return null;
-      try {
-        it = queue.remove(0);
-        inProgress.add(it);
-      } catch (Exception e) {
-        LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
-      }
-      return it;
-    }
-    
-    public synchronized void dump() {
-      LOG.info("  maxThreads    = " + maxThreads);
-      LOG.info("  inProgress    = " + inProgress.size());
-      LOG.info("  crawlDelay    = " + crawlDelay);
-      LOG.info("  minCrawlDelay = " + minCrawlDelay);
-      LOG.info("  nextFetchTime = " + nextFetchTime.get());
-      LOG.info("  now           = " + System.currentTimeMillis());
-      for (int i = 0; i < queue.size(); i++) {
-        FetchItem it = queue.get(i);
-        LOG.info("  " + i + ". " + it.url);
-      }
-    }
-    
-    private void setEndTime(long endTime) {
-      setEndTime(endTime, false);
-    }
-    
-    private void setEndTime(long endTime, boolean asap) {
-      if (!asap)
-        nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
-      else
-        nextFetchTime.set(endTime);
-    }
-  }
-  
-  /**
-   * Convenience class - a collection of queues that keeps track of the total
-   * number of items, and provides items eligible for fetching from any queue.
-   */
-  private static class FetchItemQueues {
-    public static final String DEFAULT_ID = "default";
-    Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
-    AtomicInteger totalSize = new AtomicInteger(0);
-    int maxThreads;
-    boolean byIP;
-    long crawlDelay;
-    long minCrawlDelay;
-    Configuration conf;    
-    
-    public FetchItemQueues(Configuration conf) {
-      this.conf = conf;
-      this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
-      // backward-compatible default setting
-      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
-      this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
-      this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
-    }
-    
-    public int getTotalSize() {
-      return totalSize.get();
-    }
-    
-    public int getQueueCount() {
-      return queues.size();
-    }
-    
-    public void addFetchItem(Text url, CrawlDatum datum) {
-      FetchItem it = FetchItem.create(url, datum, byIP);
-      if (it != null) addFetchItem(it);
-    }
-    
-    public void addFetchItem(FetchItem it) {
-      FetchItemQueue fiq = getFetchItemQueue(it.queueID);
-      fiq.addFetchItem(it);
-      totalSize.incrementAndGet();
-    }
-    
-    public void finishFetchItem(FetchItem it) {
-      finishFetchItem(it, false);
-    }
-    
-    public void finishFetchItem(FetchItem it, boolean asap) {
-      FetchItemQueue fiq = queues.get(it.queueID);
-      if (fiq == null) {
-        LOG.warn("Attempting to finish item from unknown queue: " + it);
-        return;
-      }
-      fiq.finishFetchItem(it, asap);
-    }
-    
-    public synchronized FetchItemQueue getFetchItemQueue(String id) {
-      FetchItemQueue fiq = queues.get(id);
-      if (fiq == null) {
-        // initialize queue
-        fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
-        queues.put(id, fiq);
-      }
-      return fiq;
-    }
-    
-    public synchronized FetchItem getFetchItem() {
-      Iterator<Map.Entry<String, FetchItemQueue>> it =
-        queues.entrySet().iterator();
-      while (it.hasNext()) {
-        FetchItemQueue fiq = it.next().getValue();
-        // reap empty queues
-        if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
-          it.remove();
-          continue;
-        }
-        FetchItem fit = fiq.getFetchItem();
-        if (fit != null) {
-          totalSize.decrementAndGet();
-          return fit;
-        }
-      }
-      return null;
-    }
-    
-    public synchronized void dump() {
-      for (String id : queues.keySet()) {
-        FetchItemQueue fiq = queues.get(id);
-        if (fiq.getQueueSize() == 0) continue;
-        LOG.info("* queue: " + id);
-        fiq.dump();
-      }
-    }
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA,
+                                Generator.GENERATOR_MARK));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.REPR_URL));
   }
   
   /**
-   * This class feeds the queues with input items, and re-fills them as
-   * items are consumed by FetcherThread-s.
+   * <p>Mapper class for Fetcher.</p>
+   * <p>
+   * This class reads the random integer written by {@link GeneratorHbase} as its key
+   * while outputting the actual key and value arguments through a {@link FetchEntry}
+   * instance.
+   * </p>
+   * <p>
+   * This approach (combined with the use of {@link PartitionUrlByHostHbase})
+   * makes sure that Fetcher is still polite while also randomizing the key order.
+   * If one host has a huge number of URLs in your table while other hosts
+   * have not, {@link FetcherReducer} will not be stuck on one host but process
+   * URLs from other hosts as well.
+   * </p> 
    */
-  private static class QueueFeeder extends Thread {
-    private RecordReader<Text, CrawlDatum> reader;
-    private FetchItemQueues queues;
-    private int size;
-    
-    public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
-        FetchItemQueues queues, int size) {
-      this.reader = reader;
-      this.queues = queues;
-      this.size = size;
-      this.setDaemon(true);
-      this.setName("QueueFeeder");
-    }
-    
-    public void run() {
-      boolean hasMore = true;
-      int cnt = 0;
-      
-      while (hasMore) {
-        int feed = size - queues.getTotalSize();
-        if (feed <= 0) {
-          // queues are full - spin-wait until they have some free space
-          try {
-            Thread.sleep(1000);
-          } catch (Exception e) {};
-          continue;
-        } else {
-          LOG.debug("-feeding " + feed + " input urls ...");
-          while (feed > 0 && hasMore) {
-            try {
-              Text url = new Text();
-              CrawlDatum datum = new CrawlDatum();
-              hasMore = reader.next(url, datum);
-              if (hasMore) {
-                queues.addFetchItem(url, datum);
-                cnt++;
-                feed--;
-              }
-            } catch (IOException e) {
-              LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
-              return;
-            }
-          }
-        }
-      }
-      LOG.info("QueueFeeder finished: total " + cnt + " records.");
-    }
-  }
-  
-  /**
-   * This class picks items from queues and fetches the pages.
-   */
-  private class FetcherThread extends Thread {
-    private Configuration conf;
-    private URLFilters urlFilters;
-    private ScoringFilters scfilters;
-    private ParseUtil parseUtil;
-    private URLNormalizers normalizers;
-    private ProtocolFactory protocolFactory;
-    private long maxCrawlDelay;
-    private boolean byIP;
-    private int maxRedirect;
-    private String reprUrl;
-    private boolean redirecting;
-    private int redirectCount;
-    private boolean ignoreExternalLinks;
-
-    public FetcherThread(Configuration conf) {
-      this.setDaemon(true);                       // don't hang JVM on exit
-      this.setName("FetcherThread");              // use an informative name
-      this.conf = conf;
-      this.urlFilters = new URLFilters(conf);
-      this.scfilters = new ScoringFilters(conf);
-      this.parseUtil = new ParseUtil(conf);
-      this.protocolFactory = new ProtocolFactory(conf);
-      this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
-      this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
-      // backward-compatible default setting
-      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
-      this.maxRedirect = conf.getInt("http.redirect.max", 3);
-      this.ignoreExternalLinks = 
-        conf.getBoolean("db.ignore.external.links", false);
-    }
-
-    public void run() {
-      activeThreads.incrementAndGet(); // count threads
-      
-      FetchItem fit = null;
-      try {
-        
-        while (true) {
-          fit = fetchQueues.getFetchItem();
-          if (fit == null) {
-            if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
-              LOG.debug(getName() + " spin-waiting ...");
-              // spin-wait.
-              spinWaiting.incrementAndGet();
-              try {
-                Thread.sleep(500);
-              } catch (Exception e) {}
-                spinWaiting.decrementAndGet();
-              continue;
-            } else {
-              // all done, finish this thread
-              return;
-            }
-          }
-          lastRequestStart.set(System.currentTimeMillis());
-          Text reprUrlWritable =
-            (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
-          if (reprUrlWritable == null) {
-            reprUrl = fit.url.toString();
-          } else {
-            reprUrl = reprUrlWritable.toString();
-          }
-          try {
-            if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); }
-
-            // fetch the page
-            redirecting = false;
-            redirectCount = 0;
-            do {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("redirectCount=" + redirectCount);
-              }
-              redirecting = false;
-              Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString());
-              RobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
-              if (!rules.isAllowed(fit.u)) {
-                // unblock
-                fetchQueues.finishFetchItem(fit, true);
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Denied by robots.txt: " + fit.url);
-                }
-                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
-                continue;
-              }
-              if (rules.getCrawlDelay() > 0) {
-                if (rules.getCrawlDelay() > maxCrawlDelay) {
-                  // unblock
-                  fetchQueues.finishFetchItem(fit, true);
-                  LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
-                  output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE);
-                  continue;
-                } else {
-                  FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
-                  fiq.crawlDelay = rules.getCrawlDelay();
-                }
-              }
-              ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum);
-              ProtocolStatus status = output.getStatus();
-              Content content = output.getContent();
-              ParseStatus pstatus = null;
-              // unblock queue
-              fetchQueues.finishFetchItem(fit);
-
-              String urlString = fit.url.toString();
-
-              switch(status.getCode()) {
-                
-              case ProtocolStatus.WOULDBLOCK:
-                // retry ?
-                fetchQueues.addFetchItem(fit);
-                break;
-
-              case ProtocolStatus.SUCCESS:        // got a page
-                pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
-                updateStatus(content.getContent().length);
-                if (pstatus != null && pstatus.isSuccess() &&
-                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
-                  String newUrl = pstatus.getMessage();
-                  int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
-                  Text redirUrl =
-                    handleRedirect(fit.url, fit.datum,
-                                   urlString, newUrl,
-                                   refreshTime < Fetcher.PERM_REFRESH_TIME,
-                                   Fetcher.CONTENT_REDIR);
-                  if (redirUrl != null) {
-                    CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
-                        fit.datum.getFetchInterval(), fit.datum.getScore());
-                    if (reprUrl != null) {
-                      newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
-                          new Text(reprUrl));
-                    }
-                    fit = FetchItem.create(redirUrl, newDatum, byIP);
-                    if (fit != null) {
-                      FetchItemQueue fiq =
-                        fetchQueues.getFetchItemQueue(fit.queueID);
-                      fiq.addInProgressFetchItem(fit);
-                    } else {
-                      // stop redirecting
-                      redirecting = false;
-                    }
-                  }
-                }
-                break;
-
-              case ProtocolStatus.MOVED:         // redirect
-              case ProtocolStatus.TEMP_MOVED:
-                int code;
-                boolean temp;
-                if (status.getCode() == ProtocolStatus.MOVED) {
-                  code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
-                  temp = false;
-                } else {
-                  code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
-                  temp = true;
-                }
-                output(fit.url, fit.datum, content, status, code);
-                String newUrl = status.getMessage();
-                Text redirUrl =
-                  handleRedirect(fit.url, fit.datum,
-                                 urlString, newUrl, temp,
-                                 Fetcher.PROTOCOL_REDIR);
-                if (redirUrl != null) {
-                  CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
-                      fit.datum.getFetchInterval(), fit.datum.getScore());
-                  if (reprUrl != null) {
-                    newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
-                        new Text(reprUrl));
-                  }
-                  fit = FetchItem.create(redirUrl, newDatum, byIP);
-                  if (fit != null) {
-                    FetchItemQueue fiq =
-                      fetchQueues.getFetchItemQueue(fit.queueID);
-                    fiq.addInProgressFetchItem(fit);
-                  } else {
-                    // stop redirecting
-                    redirecting = false;
-                  }
-                } else {
-                  // stop redirecting
-                  redirecting = false;
-                }
-                break;
-
-              case ProtocolStatus.EXCEPTION:
-                logError(fit.url, status.getMessage());
-                /* FALLTHROUGH */
-              case ProtocolStatus.RETRY:          // retry
-              case ProtocolStatus.BLOCKED:
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
-                break;
-                
-              case ProtocolStatus.GONE:           // gone
-              case ProtocolStatus.NOTFOUND:
-              case ProtocolStatus.ACCESS_DENIED:
-              case ProtocolStatus.ROBOTS_DENIED:
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
-                break;
-
-              case ProtocolStatus.NOTMODIFIED:
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
-                break;
-
-              default:
-                if (LOG.isWarnEnabled()) {
-                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());
-                }
-                output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
-              }
-
-              if (redirecting && redirectCount >= maxRedirect) {
-                fetchQueues.finishFetchItem(fit);
-                if (LOG.isInfoEnabled()) {
-                  LOG.info(" - redirect count exceeded " + fit.url);
-                }
-                output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE);
-              }
-
-            } while (redirecting && (redirectCount < maxRedirect));
-            
-          } catch (Throwable t) {                 // unexpected exception
-            // unblock
-            fetchQueues.finishFetchItem(fit);
-            logError(fit.url, t.toString());
-            output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY);
-          }
-        }
-
-      } catch (Throwable e) {
-        if (LOG.isFatalEnabled()) {
-          e.printStackTrace(LogUtil.getFatalStream(LOG));
-          LOG.fatal("fetcher caught:"+e.toString());
-        }
-      } finally {
-        if (fit != null) fetchQueues.finishFetchItem(fit);
-        activeThreads.decrementAndGet(); // count threads
-        LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
-      }
-    }
+  public static class FetcherMapper
+  extends TableMapper<ImmutableBytesWritable, FetchEntry> {
 
-    private Text handleRedirect(Text url, CrawlDatum datum,
-                                String urlString, String newUrl,
-                                boolean temp, String redirType)
-    throws MalformedURLException, URLFilterException {
-      newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
-      newUrl = urlFilters.filter(newUrl);
-      
-      if (ignoreExternalLinks) {
-        try {
-          String origHost = new URL(urlString).getHost().toLowerCase();
-          String newHost = new URL(newUrl).getHost().toLowerCase();
-          if (!origHost.equals(newHost)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(" - ignoring redirect " + redirType + " from " +
-                          urlString + " to " + newUrl +
-                          " because external links are ignored");
-            }
-            return null;
-          }
-        } catch (MalformedURLException e) { }
-      }
-      
-      if (newUrl != null && !newUrl.equals(urlString)) {
-        reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
-        url = new Text(newUrl);
-        if (maxRedirect > 0) {
-          redirecting = true;
-          redirectCount++;
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " +
-                url + " (fetching now)");
-          }
-          return url;
-        } else {
-          CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
-              datum.getFetchInterval());
-          if (reprUrl != null) {
-            newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
-                new Text(reprUrl));
-          }
-          output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " +
-                url + " (fetching later)");
-          }
-          return null;
-        }
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(" - " + redirType + " redirect skipped: " +
-              (newUrl != null ? "to same url" : "filtered"));
-        }
-        return null;
-      }
-    }
-
-    private void logError(Text url, String message) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("fetch of " + url + " failed with: " + message);
-      }
-      errors.incrementAndGet();
+    @Override
+    protected void map(ImmutableBytesWritable key, Result value,
+        Context context) throws IOException, InterruptedException {
+      byte[] outKeyRaw =
+        value.getValue(WebTableColumns.METADATA, Generator.GENERATOR_MARK);
+      context.write(new ImmutableBytesWritable(outKeyRaw), new FetchEntry(key, value));
     }
-
-    private ParseStatus output(Text key, CrawlDatum datum,
-                        Content content, ProtocolStatus pstatus, int status) {
-
-      datum.setStatus(status);
-      datum.setFetchTime(System.currentTimeMillis());
-      if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
-
-      ParseResult parseResult = null;
-      if (content != null) {
-        Metadata metadata = content.getMetadata();
-        // add segment to metadata
-        metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
-        // add score to content metadata so that ParseSegment can pick it up.
-        try {
-          scfilters.passScoreBeforeParsing(key, datum, content);
-        } catch (Exception e) {
-          if (LOG.isWarnEnabled()) {
-            e.printStackTrace(LogUtil.getWarnStream(LOG));
-            LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
-          }
-        }
-        /* Note: Fetcher will only follow meta-redirects coming from the
-         * original URL. */ 
-        if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
-          try {
-            parseResult = this.parseUtil.parse(content);
-          } catch (Exception e) {
-            LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
-          }
-
-          if (parseResult == null) {
-            byte[] signature = 
-              SignatureFactory.getSignature(getConf()).calculate(content, 
-                  new ParseStatus().getEmptyParse(conf));
-            datum.setSignature(signature);
-          }
-        }
-        
-        /* Store status code in content So we can read this value during 
-         * parsing (as a separate job) and decide to parse or not.
-         */
-        content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
-      }
-
-      try {
-        output.collect(key, new NutchWritable(datum));
-        if (content != null && storingContent)
-          output.collect(key, new NutchWritable(content));
-        if (parseResult != null) {
-          for (Entry<Text, Parse> entry : parseResult) {
-            Text url = entry.getKey();
-            Parse parse = entry.getValue();
-            ParseStatus parseStatus = parse.getData().getStatus();
-            
-            if (!parseStatus.isSuccess()) {
-              LOG.warn("Error parsing: " + key + ": " + parseStatus);
-              parse = parseStatus.getEmptyParse(getConf());
-            }
-
-            // Calculate page signature. For non-parsing fetchers this will
-            // be done in ParseSegment
-            byte[] signature = 
-              SignatureFactory.getSignature(getConf()).calculate(content, parse);
-            // Ensure segment name and score are in parseData metadata
-            parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
-                segmentName);
-            parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
-                StringUtil.toHexString(signature));
-            // Pass fetch time to content meta
-            parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
-                Long.toString(datum.getFetchTime()));
-            if (url.equals(key))
-              datum.setSignature(signature);
-            try {
-              scfilters.passScoreAfterParsing(url, content, parse);
-            } catch (Exception e) {
-              if (LOG.isWarnEnabled()) {
-                e.printStackTrace(LogUtil.getWarnStream(LOG));
-                LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
-              }
-            }
-            output.collect(url, new NutchWritable(
-                    new ParseImpl(new ParseText(parse.getText()), 
-                                  parse.getData(), parse.isCanonical())));
-          }
-        }
-      } catch (IOException e) {
-        if (LOG.isFatalEnabled()) {
-          e.printStackTrace(LogUtil.getFatalStream(LOG));
-          LOG.fatal("fetcher caught:"+e.toString());
-        }
-      }
-
-      // return parse status if it exits
-      if (parseResult != null && !parseResult.isEmpty()) {
-        Parse p = parseResult.get(content.getUrl());
-        if (p != null) {
-          return p.getData().getStatus();
-        }
-      }
-      return null;
-    }
-    
   }
-
-  public Fetcher() { super(null); }
-
-  public Fetcher(Configuration conf) { super(conf); }
-
-  private void updateStatus(int bytesInPage) throws IOException {
-    pages.incrementAndGet();
-    bytes.addAndGet(bytesInPage);
-  }
-
   
-  private void reportStatus() throws IOException {
-    String status;
-    long elapsed = (System.currentTimeMillis() - start)/1000;
-    status = activeThreads + " threads, " +
-      pages+" pages, "+errors+" errors, "
-      + Math.round(((float)pages.get()*10)/elapsed)/10.0+" pages/s, "
-      + Math.round(((((float)bytes.get())*8)/1024)/elapsed)+" kb/s, ";
-    reporter.setStatus(status);
-  }
-
-  public void configure(JobConf job) {
-    setConf(job);
-
-    this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
-    this.storingContent = isStoringContent(job);
-    this.parsing = isParsing(job);
-
-//    if (job.getBoolean("fetcher.verbose", false)) {
-//      LOG.setLevel(Level.FINE);
-//    }
-  }
+  public static final Log LOG = LogFactory.getLog(Fetcher.class);
 
-  public void close() {}
+  private Configuration conf;
 
-  public static boolean isParsing(Configuration conf) {
-    return conf.getBoolean("fetcher.parse", true);
+  @Override
+  public Configuration getConf() {
+    return conf;
   }
 
-  public static boolean isStoringContent(Configuration conf) {
-    return conf.getBoolean("fetcher.store.content", true);
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
   }
 
-  public void run(RecordReader<Text, CrawlDatum> input,
-      OutputCollector<Text, NutchWritable> output,
-                  Reporter reporter) throws IOException {
-
-    this.output = output;
-    this.reporter = reporter;
-    this.fetchQueues = new FetchItemQueues(getConf());
-
-    int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
-    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: threads: " + threadCount); }
-
-    feeder = new QueueFeeder(input, fetchQueues, threadCount * 50);
-    //feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
-    feeder.start();
-
-    // set non-blocking & no-robots mode for HTTP protocol plugins.
-    getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
-    getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
+  private void fetch(String table, int threads, boolean restart) throws Exception {
+    LOG.info("Fetcher: starting");
+    LOG.info("Fetcher: table: " + table);
     
-    for (int i = 0; i < threadCount; i++) {       // spawn threads
-      new FetcherThread(getConf()).start();
+    if (threads > 0) {
+      getConf().setInt("fetcher.threads.fetch", threads);
     }
-
-    // select a timeout that avoids a task timeout
-    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
-
-    do {                                          // wait for threads to exit
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {}
-
-      reportStatus();
-      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
-          + ", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
-
-      if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
-        fetchQueues.dump();
-      }
-      // some requests seem to hang, despite all intentions
-      if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Aborting with "+activeThreads+" hung threads.");
-        }
-        return;
-      }
-
-    } while (activeThreads.get() > 0);
-    LOG.info("-activeThreads=" + activeThreads);
     
-  }
-
-  public void fetch(Path segment, int threads, boolean parsing)
-    throws IOException {
-
-    checkConfiguration();
-
-    if (LOG.isInfoEnabled()) {
-      LOG.info("Fetcher: starting");
-      LOG.info("Fetcher: segment: " + segment);
+    Job job = new NutchJob(getConf(), "fetch " + table);
+    Scan scan = TableUtil.createScanFromColumns(COLUMNS);
+    List<Filter> filters = new ArrayList<Filter>();
+    if (!restart) {
+      filters.add(new ValueFilter(WebTableColumns.METADATA, FETCH_MARK,
+          CompareOp.NOT_EQUAL, TableUtil.YES_VAL, false));
     }
-
-    JobConf job = new NutchJob(getConf());
-    job.setJobName("fetch " + segment);
-
-    job.setInt("fetcher.threads.fetch", threads);
-    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
-    job.setBoolean("fetcher.parse", parsing);
-
-    // for politeness, don't permit parallel execution of a single task
-    job.setSpeculativeExecution(false);
-
-    FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
-    job.setInputFormat(InputFormat.class);
-
-    job.setMapRunnerClass(Fetcher.class);
-
-    FileOutputFormat.setOutputPath(job, segment);
-    job.setOutputFormat(FetcherOutputFormat.class);
-    job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(NutchWritable.class);
-
-    JobClient.runJob(job);
-    if (LOG.isInfoEnabled()) { LOG.info("Fetcher: done"); }
+    filters.add(new ValueFilter(WebTableColumns.METADATA, Generator.GENERATOR_MARK,
+        CompareOp.GREATER_OR_EQUAL, new byte[] { (byte)0 }, true));
+    FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filters);
+    scan.setFilter(filterList);
+    TableMapReduceUtil.initTableMapperJob(table, scan, FetcherMapper.class,
+        ImmutableBytesWritable.class, FetchEntry.class, job);
+    TableMapReduceUtil.initTableReducerJob(table,
+        FetcherReducer.class, job, PartitionUrlByHost.class);
+    
+    job.waitForCompletion(true);
+    
+    LOG.info("Fetcher: done");
   }
-
-
-  /** Run the fetcher. */
-  public static void main(String[] args) throws Exception {
-
-    String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
+  
+  @Override
+  public int run(String[] args) throws Exception {
+    final String usage = "Usage: FetcherHbase <webtable> [-threads n] [-restart]";
 
     if (args.length < 1) {
       System.err.println(usage);
       System.exit(-1);
     }
-      
-    Path segment = new Path(args[0]);
 
-    Configuration conf = NutchConfiguration.create();
+    final String table = args[0];
 
-    int threads = conf.getInt("fetcher.threads.fetch", 10);
-    boolean parsing = true;
+    int threads = -1;
+    boolean restart = false;
 
-    for (int i = 1; i < args.length; i++) {       // parse command line
-      if (args[i].equals("-threads")) {           // found -threads option
+    for (int i = 1; i < args.length; i++) {
+      if ("-threads".equals(args[i])) {
+        // found -threads option
         threads =  Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-noParsing")) parsing = false;
+      } else if ("-restart".equals(args[i])) {
+        restart = true;
+      }
     }
 
-    conf.setInt("fetcher.threads.fetch", threads);
-    if (!parsing) {
-      conf.setBoolean("fetcher.parse", parsing);
-    }
-    Fetcher fetcher = new Fetcher(conf);          // make a Fetcher
-    
-    fetcher.fetch(segment, threads, parsing);              // run the Fetcher
+    fetch(table, threads, restart);              // run the Fetcher
 
+    return 0;
   }
 
-  private void checkConfiguration() {
-
-    // ensure that a value has been set for the agent name and that that
-    // agent name is the first value in the agents we advertise for robot
-    // rules parsing
-    String agentName = getConf().get("http.agent.name");
-    if (agentName == null || agentName.trim().length() == 0) {
-      String message = "Fetcher: No agents listed in 'http.agent.name'"
-          + " property.";
-      if (LOG.isFatalEnabled()) {
-        LOG.fatal(message);
-      }
-      throw new IllegalArgumentException(message);
-    } else {
-
-      // get all of the agents that we advertise
-      String agentNames = getConf().get("http.robots.agents");
-      StringTokenizer tok = new StringTokenizer(agentNames, ",");
-      ArrayList<String> agents = new ArrayList<String>();
-      while (tok.hasMoreTokens()) {
-        agents.add(tok.nextToken().trim());
-      }
-
-      // if the first one is not equal to our agent name, log fatal and throw
-      // an exception
-      if (!(agents.get(0)).equalsIgnoreCase(agentName)) {
-        String message = "Fetcher: Your 'http.agent.name' value should be "
-            + "listed first in 'http.robots.agents' property.";
-        if (LOG.isWarnEnabled()) {
-          LOG.warn(message);
-        }
-      }
-    }
+  public static void main(String[] args) throws Exception {
+    final int res = ToolRunner.run(NutchConfiguration.create(),
+        new Fetcher(), args);
+    System.exit(res);
   }
-
 }

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherOutput.java Sun Aug 16 22:25:12 2009
@@ -17,65 +17,16 @@
 
 package org.apache.nutch.fetcher;
 
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
 
-import org.apache.hadoop.io.*;
-import org.apache.nutch.crawl.CrawlDatum;
-import org.apache.nutch.protocol.Content;
-import org.apache.nutch.parse.*;
+import org.apache.hadoop.io.Writable;
 
 /* An entry in the fetcher's output. */
 public final class FetcherOutput implements Writable {
-  private CrawlDatum crawlDatum;
-  private Content content;
-  private ParseImpl parse;
-
-  public FetcherOutput() {}
-
-  public FetcherOutput(CrawlDatum crawlDatum, Content content,
-                       ParseImpl parse) {
-    this.crawlDatum = crawlDatum;
-    this.content = content;
-    this.parse = parse;
-  }
-
-  public final void readFields(DataInput in) throws IOException {
-    this.crawlDatum = CrawlDatum.read(in);
-    this.content = in.readBoolean() ? Content.read(in) : null;
-    this.parse = in.readBoolean() ? ParseImpl.read(in) : null;
-  }
-
-  public final void write(DataOutput out) throws IOException {
-    crawlDatum.write(out);
-
-    out.writeBoolean(content != null);
-    if (content != null) {
-      content.write(out);
-    }
-
-    out.writeBoolean(parse != null);
-    if (parse != null) {
-      parse.write(out);
-    }
-  }
-
-  public CrawlDatum getCrawlDatum() { return crawlDatum; }
-  public Content getContent() { return content; }
-  public ParseImpl getParse() { return parse; }
-
-  public boolean equals(Object o) {
-    if (!(o instanceof FetcherOutput))
-      return false;
-    FetcherOutput other = (FetcherOutput)o;
-    return
-      this.crawlDatum.equals(other.crawlDatum) &&
-      this.content.equals(other.content);
-  }
-
-  public String toString() {
-    StringBuffer buffer = new StringBuffer();
-    buffer.append("CrawlDatum: " + crawlDatum+"\n" );
-    return buffer.toString();
-  }
 
+  public final void readFields(DataInput in) throws IOException { }
+
+  public final void write(DataOutput out) throws IOException { }
 }

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/FetcherReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,663 @@
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableReducer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.nutch.crawl.CrawlDatumHbase;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.protocol.RobotRules;
+import org.apache.nutch.util.LogUtil;
+import org.apache.nutch.util.URLUtil;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+import org.apache.nutch.util.hbase.WebTableRow;
+
+public class FetcherReducer
+extends TableReducer<ImmutableBytesWritable, FetchEntry, ImmutableBytesWritable> {
+
+  public static final Log LOG = Fetcher.LOG;
+
+  private final AtomicInteger activeThreads = new AtomicInteger(0);
+  private final AtomicInteger spinWaiting = new AtomicInteger(0);
+
+  private final long start = System.currentTimeMillis(); // start time of fetcher run
+  private final AtomicLong lastRequestStart = new AtomicLong(start);
+
+  private final AtomicLong bytes = new AtomicLong(0);        // total bytes fetched
+  private final AtomicInteger pages = new AtomicInteger(0);  // total pages fetched
+  private final AtomicInteger errors = new AtomicInteger(0); // total pages errored
+
+  private QueueFeeder feeder;
+
+  private List<FetcherThread> fetcherThreads = new ArrayList<FetcherThread>();
+
+  private FetchItemQueues fetchQueues;
+  
+  private static final ImmutableBytesWritable REDUCE_KEY =
+    new ImmutableBytesWritable(TableUtil.YES_VAL);
+
+  /**
+   * This class described the item to be fetched.
+   */
+  private static class FetchItem {
+    WebTableRow row;
+    String queueID;
+    String url;
+    URL u;
+
+    public FetchItem(String url, WebTableRow row, URL u, String queueID) {
+      this.row = row;
+      this.url = url;
+      this.u = u;
+      this.queueID = queueID;
+    }
+
+    /** Create an item. Queue id will be created based on <code>byIP</code>
+     * argument, either as a protocol + hostname pair, or protocol + IP
+     * address pair.
+     */
+    public static FetchItem create(String url, WebTableRow row, boolean byIP) {
+      String queueID;
+      URL u = null;
+      try {
+        u = new URL(url);
+      } catch (final Exception e) {
+        LOG.warn("Cannot parse url: " + url, e);
+        return null;
+      }
+      final String proto = u.getProtocol().toLowerCase();
+      String host;
+      if (byIP) {
+        try {
+          final InetAddress addr = InetAddress.getByName(u.getHost());
+          host = addr.getHostAddress();
+        } catch (final UnknownHostException e) {
+          // unable to resolve it, so don't fall back to host name
+          LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+          return null;
+        }
+      } else {
+        host = u.getHost();
+        if (host == null) {
+          LOG.warn("Unknown host for url: " + url + ", skipping.");
+          return null;
+        }
+        host = host.toLowerCase();
+      }
+      queueID = proto + "://" + host;
+      return new FetchItem(url, row, u, queueID);
+    }
+
+  }
+
+  /**
+   * This class handles FetchItems which come from the same host ID (be it
+   * a proto/hostname or proto/IP pair). It also keeps track of requests in
+   * progress and elapsed time between requests.
+   */
+  private static class FetchItemQueue {
+    List<FetchItem> queue = Collections.synchronizedList(new LinkedList<FetchItem>());
+    Set<FetchItem>  inProgress = Collections.synchronizedSet(new HashSet<FetchItem>());
+    AtomicLong nextFetchTime = new AtomicLong();
+    long crawlDelay;
+    long minCrawlDelay;
+    int maxThreads;
+
+    public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay, long minCrawlDelay) {
+      this.maxThreads = maxThreads;
+      this.crawlDelay = crawlDelay;
+      this.minCrawlDelay = minCrawlDelay;
+      // ready to start
+      setEndTime(System.currentTimeMillis() - crawlDelay);
+    }
+
+    public int getQueueSize() {
+      return queue.size();
+    }
+
+    public int getInProgressSize() {
+      return inProgress.size();
+    }
+
+    public void finishFetchItem(FetchItem it, boolean asap) {
+      if (it != null) {
+        inProgress.remove(it);
+        setEndTime(System.currentTimeMillis(), asap);
+      }
+    }
+
+    public void addFetchItem(FetchItem it) {
+      if (it == null) return;
+      queue.add(it);
+    }
+
+    @SuppressWarnings("unused")
+    public void addInProgressFetchItem(FetchItem it) {
+      if (it == null) return;
+      inProgress.add(it);
+    }
+
+    public FetchItem getFetchItem() {
+      if (inProgress.size() >= maxThreads) return null;
+      final long now = System.currentTimeMillis();
+      if (nextFetchTime.get() > now) return null;
+      FetchItem it = null;
+      if (queue.size() == 0) return null;
+      try {
+        it = queue.remove(0);
+        inProgress.add(it);
+      } catch (final Exception e) {
+        LOG.error("Cannot remove FetchItem from queue or cannot add it to inProgress queue", e);
+      }
+      return it;
+    }
+
+    public synchronized void dump() {
+      LOG.info("  maxThreads    = " + maxThreads);
+      LOG.info("  inProgress    = " + inProgress.size());
+      LOG.info("  crawlDelay    = " + crawlDelay);
+      LOG.info("  minCrawlDelay = " + minCrawlDelay);
+      LOG.info("  nextFetchTime = " + nextFetchTime.get());
+      LOG.info("  now           = " + System.currentTimeMillis());
+      for (int i = 0; i < queue.size(); i++) {
+        final FetchItem it = queue.get(i);
+        LOG.info("  " + i + ". " + it.url);
+      }
+    }
+
+    private void setEndTime(long endTime) {
+      setEndTime(endTime, false);
+    }
+
+    private void setEndTime(long endTime, boolean asap) {
+      if (!asap)
+        nextFetchTime.set(endTime + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+      else
+        nextFetchTime.set(endTime);
+    }
+  }
+
+  /**
+   * Convenience class - a collection of queues that keeps track of the total
+   * number of items, and provides items eligible for fetching from any queue.
+   */
+  private static class FetchItemQueues {
+    @SuppressWarnings("unused")
+    public static final String DEFAULT_ID = "default";
+    Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+    AtomicInteger totalSize = new AtomicInteger(0);
+    int maxThreads;
+    boolean byIP;
+    long crawlDelay;
+    long minCrawlDelay;
+    Configuration conf;
+
+    public FetchItemQueues(Configuration conf) {
+      this.conf = conf;
+      this.maxThreads = conf.getInt("fetcher.threads.per.host", 1);
+      // backward-compatible default setting
+      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", false);
+      this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
+      this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay", 0.0f) * 1000);
+    }
+
+    public int getTotalSize() {
+      return totalSize.get();
+    }
+
+    public int getQueueCount() {
+      return queues.size();
+    }
+
+    public void addFetchItem(String url, WebTableRow row) {
+      final FetchItem it = FetchItem.create(url, row, byIP);
+      if (it != null) addFetchItem(it);
+    }
+
+    public synchronized void addFetchItem(FetchItem it) {
+      final FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+      fiq.addFetchItem(it);
+      totalSize.incrementAndGet();
+    }
+
+    public void finishFetchItem(FetchItem it) {
+      finishFetchItem(it, false);
+    }
+
+    public void finishFetchItem(FetchItem it, boolean asap) {
+      final FetchItemQueue fiq = queues.get(it.queueID);
+      if (fiq == null) {
+        LOG.warn("Attempting to finish item from unknown queue: " + it);
+        return;
+      }
+      fiq.finishFetchItem(it, asap);
+    }
+
+    public synchronized FetchItemQueue getFetchItemQueue(String id) {
+      FetchItemQueue fiq = queues.get(id);
+      if (fiq == null) {
+        // initialize queue
+        fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+        queues.put(id, fiq);
+      }
+      return fiq;
+    }
+
+    public synchronized FetchItem getFetchItem() {
+      final Iterator<Map.Entry<String, FetchItemQueue>> it =
+        queues.entrySet().iterator();
+      while (it.hasNext()) {
+        final FetchItemQueue fiq = it.next().getValue();
+        // reap empty queues
+        if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+          it.remove();
+          continue;
+        }
+        final FetchItem fit = fiq.getFetchItem();
+        if (fit != null) {
+          totalSize.decrementAndGet();
+
+          return fit;
+        }
+      }
+      return null;
+    }
+
+    public synchronized void dump() {
+      for (final String id : queues.keySet()) {
+        final FetchItemQueue fiq = queues.get(id);
+        if (fiq.getQueueSize() == 0) continue;
+        LOG.info("* queue: " + id);
+        fiq.dump();
+      }
+    }
+  }
+
+  /**
+   * This class picks items from queues and fetches the pages.
+   */
+  private class FetcherThread extends Thread {
+    private final URLFilters urlFilters;
+    private final URLNormalizers normalizers;
+    private final ProtocolFactory protocolFactory;
+    private final long maxCrawlDelay;
+    @SuppressWarnings("unused")
+    private final boolean byIP;
+    private final int maxRedirect;
+    private String reprUrl;
+    private boolean redirecting;
+    private int redirectCount;
+    private Context context;
+
+    public FetcherThread(Context context, int num) {
+      this.setDaemon(true);                       // don't hang JVM on exit
+      this.setName("FetcherThread" + num);        // use an informative name
+      this.context = context;
+      Configuration conf = context.getConfiguration();
+      this.urlFilters = new URLFilters(conf);
+      this.protocolFactory = new ProtocolFactory(conf);
+      this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+      this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+      // backward-compatible default setting
+      this.byIP = conf.getBoolean("fetcher.threads.per.host.by.ip", true);
+      this.maxRedirect = conf.getInt("http.redirect.max", 3);
+    }
+
+    @Override
+    public void run() {
+      activeThreads.incrementAndGet(); // count threads
+
+      FetchItem fit = null;
+      try {
+
+        while (true) {
+          fit = fetchQueues.getFetchItem();
+          if (fit == null) {
+            if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(getName() + " fetchQueues.getFetchItem() was null, spin-waiting ...");
+              }
+              // spin-wait.
+              spinWaiting.incrementAndGet();
+              try {
+                Thread.sleep(500);
+              } catch (final Exception e) {}
+              spinWaiting.decrementAndGet();
+              continue;
+            } else {
+              // all done, finish this thread
+              return;
+            }
+          }
+          lastRequestStart.set(System.currentTimeMillis());
+          if (!fit.row.hasColumn(WebTableColumns.REPR_URL, null)) {
+            reprUrl = fit.url.toString();
+          } else {
+            reprUrl = fit.row.getReprUrl();
+          }
+          try {
+            LOG.info("fetching " + fit.url);
+
+            // fetch the page
+            redirecting = false;
+            redirectCount = 0;
+            do {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("redirectCount=" + redirectCount);
+              }
+              redirecting = false;
+              final Protocol protocol = this.protocolFactory.getProtocol(fit.url);
+              final RobotRules rules = protocol.getRobotRules(fit.url, fit.row);
+              if (!rules.isAllowed(fit.u)) {
+                // unblock
+                fetchQueues.finishFetchItem(fit, true);
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Denied by robots.txt: " + fit.url);
+                }
+                output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED,
+                    CrawlDatumHbase.STATUS_GONE);
+                continue;
+              }
+              if (rules.getCrawlDelay() > 0) {
+                if (rules.getCrawlDelay() > maxCrawlDelay) {
+                  // unblock
+                  fetchQueues.finishFetchItem(fit, true);
+                  LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping");
+                  output(fit, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatumHbase.STATUS_GONE);
+                  continue;
+                } else {
+                  final FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
+                  fiq.crawlDelay = rules.getCrawlDelay();
+                }
+              }
+              final ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.row);
+              final ProtocolStatus status = output.getStatus();
+              final Content content = output.getContent();
+              // unblock queue
+              fetchQueues.finishFetchItem(fit);
+
+              switch(status.getCode()) {
+
+              case ProtocolStatus.WOULDBLOCK:
+                // retry ?
+                fetchQueues.addFetchItem(fit);
+                break;
+
+              case ProtocolStatus.SUCCESS:        // got a page
+                output(fit, content, status, CrawlDatumHbase.STATUS_FETCHED);
+                updateStatus(content.getContent().length);
+                break;
+
+              case ProtocolStatus.MOVED:         // redirect
+              case ProtocolStatus.TEMP_MOVED:
+                byte code;
+                boolean temp;
+                if (status.getCode() == ProtocolStatus.MOVED) {
+                  code = CrawlDatumHbase.STATUS_REDIR_PERM;
+                  temp = false;
+                } else {
+                  code = CrawlDatumHbase.STATUS_REDIR_TEMP;
+                  temp = true;
+                }
+                output(fit, content, status, code);
+                final String newUrl = status.getMessage();
+                handleRedirect(fit.url, newUrl, temp,  Fetcher.PROTOCOL_REDIR);
+                redirecting = false;
+                break;
+              case ProtocolStatus.EXCEPTION:
+                logError(fit.url, status.getMessage());
+                /* FALLTHROUGH */
+              case ProtocolStatus.RETRY:          // retry
+              case ProtocolStatus.BLOCKED:
+                output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+                break;
+
+              case ProtocolStatus.GONE:           // gone
+              case ProtocolStatus.NOTFOUND:
+              case ProtocolStatus.ACCESS_DENIED:
+              case ProtocolStatus.ROBOTS_DENIED:
+                output(fit, null, status, CrawlDatumHbase.STATUS_GONE);
+                break;
+
+              case ProtocolStatus.NOTMODIFIED:
+                output(fit, null, status, CrawlDatumHbase.STATUS_NOTMODIFIED);
+                break;
+
+              default:
+                if (LOG.isWarnEnabled()) {
+                  LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+                }
+                output(fit, null, status, CrawlDatumHbase.STATUS_RETRY);
+              }
+
+              if (redirecting && redirectCount >= maxRedirect) {
+                fetchQueues.finishFetchItem(fit);
+                if (LOG.isInfoEnabled()) {
+                  LOG.info(" - redirect count exceeded " + fit.url);
+                }
+                output(fit, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatumHbase.STATUS_GONE);
+              }
+
+            } while (redirecting && (redirectCount < maxRedirect));
+
+          } catch (final Throwable t) {                 // unexpected exception
+            // unblock
+            fetchQueues.finishFetchItem(fit);
+            t.printStackTrace();
+            logError(fit.url, t.toString());
+            output(fit, null, ProtocolStatus.STATUS_FAILED, CrawlDatumHbase.STATUS_RETRY);
+          }
+        }
+
+      } catch (final Throwable e) {
+        if (LOG.isFatalEnabled()) {
+          e.printStackTrace(LogUtil.getFatalStream(LOG));
+          LOG.fatal("fetcher caught:"+e.toString());
+        }
+      } finally {
+        if (fit != null) fetchQueues.finishFetchItem(fit);
+        activeThreads.decrementAndGet(); // count threads
+        LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads);
+      }
+    }
+
+    private void handleRedirect(String url, String newUrl,
+        boolean temp, String redirType)
+    throws URLFilterException, IOException, InterruptedException {
+      newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+      newUrl = urlFilters.filter(newUrl);
+      if (newUrl == null || newUrl.equals(url)) {
+        return;
+      }
+      reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+      final String reversedNewUrl = TableUtil.reverseUrl(newUrl);
+      // TODO: Find a way to use MutablWebTableRow here
+      Put put = new Put(Bytes.toBytes(reversedNewUrl));
+      if (!reprUrl.equals(url)) {
+        put.add(WebTableColumns.REPR_URL, null, Bytes.toBytes(reprUrl));
+      }
+      put.add(WebTableColumns.METADATA,
+              Fetcher.REDIRECT_DISCOVERED, TableUtil.YES_VAL);
+      context.write(REDUCE_KEY, put);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(" - " + redirType + " redirect to " +
+            reprUrl + " (fetching later)");
+      }
+
+    }
+
+    private void updateStatus(int bytesInPage) throws IOException {
+      pages.incrementAndGet();
+      bytes.addAndGet(bytesInPage);
+    }
+
+    private void output(FetchItem fit, Content content,
+        ProtocolStatus pstatus, byte status)
+    throws IOException, InterruptedException {
+      fit.row.setStatus(status);
+      final long prevFetchTime = fit.row.getFetchTime();
+      fit.row.setPrevFetchTime(prevFetchTime);
+      fit.row.setFetchTime(System.currentTimeMillis());
+      if (pstatus != null) {
+        fit.row.setProtocolStatus(pstatus);
+      }
+
+      if (content != null) {
+        fit.row.setContent(content.getContent());
+        fit.row.setContentType(content.getContentType());
+        fit.row.setBaseUrl(content.getBaseUrl());
+      }
+      fit.row.putMeta(Fetcher.FETCH_MARK, TableUtil.YES_VAL);
+      fit.row.makeRowMutation().writeToContext(REDUCE_KEY, context);
+    }
+
+    private void logError(String url, String message) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("fetch of " + url + " failed with: " + message);
+      }
+      errors.incrementAndGet();
+    }
+  }
+
+  /**
+   * This class feeds the queues with input items, and re-fills them as
+   * items are consumed by FetcherThread-s.
+   */
+  private static class QueueFeeder extends Thread {
+    private final Context context;
+    private final FetchItemQueues queues;
+    private final int size;
+    private Iterator<FetchEntry> currentIter;
+    boolean hasMore;
+
+    public QueueFeeder(Context context,
+        FetchItemQueues queues, int size)
+    throws IOException, InterruptedException {
+      this.context = context;
+      this.queues = queues;
+      this.size = size;
+      this.setDaemon(true);
+      this.setName("QueueFeeder");
+      hasMore = context.nextKey();
+      if (hasMore) {
+        currentIter = context.getValues().iterator();
+      }
+    }
+
+    @Override
+    public void run() {
+      int cnt = 0;
+      try {
+        while (hasMore) {
+          int feed = size - queues.getTotalSize();
+          if (feed <= 0) {
+            // queues are full - spin-wait until they have some free space
+            try {
+              Thread.sleep(1000);
+            } catch (final Exception e) {};
+            continue;
+          } 
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("-feeding " + feed + " input urls ...");
+          }
+          while (feed > 0 && currentIter.hasNext()) {
+            FetchEntry entry = new FetchEntry();
+            // since currentIter.next() reuses the same
+            // FetchEntry object we need to clone it
+            Writables.copyWritable(currentIter.next(), entry);
+            WebTableRow row = new WebTableRow(entry.getRow());
+            final String url =
+              TableUtil.unreverseUrl(Bytes.toString(entry.getKey().get()));
+            queues.addFetchItem(url, row);
+            feed--;
+            cnt++;
+          }
+          if (currentIter.hasNext()) {
+            continue; // finish items in current list before reading next key
+          }
+          hasMore = context.nextKey();
+          if (hasMore) {
+            currentIter = context.getValues().iterator();
+          }
+        }
+      } catch (Exception e) {
+        LOG.fatal("QueueFeeder error reading input, record " + cnt, e);
+        return;
+      }
+      LOG.info("QueueFeeder finished: total " + cnt + " records.");
+    }
+  }
+
+  @Override
+  public void run(Context context)
+  throws IOException, InterruptedException {
+    Configuration conf = context.getConfiguration();
+    this.fetchQueues = new FetchItemQueues(conf);
+    final int threadCount = conf.getInt("fetcher.threads.fetch", 10);
+    LOG.info("FetcherHbase: threads: " + threadCount);
+
+    // set non-blocking & no-robots mode for HTTP protocol plugins.
+    conf.setBoolean(Protocol.CHECK_BLOCKING, false);
+    conf.setBoolean(Protocol.CHECK_ROBOTS, false);
+
+    feeder = new QueueFeeder(context, fetchQueues, threadCount * 50);
+    feeder.start();
+
+    for (int i = 0; i < threadCount; i++) {       // spawn threads
+      FetcherThread ft = new FetcherThread(context, i);
+      fetcherThreads.add(ft);
+      ft.start();
+    }
+    // select a timeout that avoids a task timeout
+    final long timeout = conf.getInt("mapred.task.timeout", 10*60*1000)/2;
+
+    do {                                          // wait for threads to exit
+      try {
+        Thread.sleep(10000);
+      } catch (final InterruptedException e) {}
+
+      context.progress();
+      LOG.info("-activeThreads=" + activeThreads + ", spinWaiting=" + spinWaiting.get()
+          + ", fetchQueues= " + fetchQueues.getQueueCount() +", fetchQueues.totalSize=" + fetchQueues.getTotalSize());
+
+      if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+        fetchQueues.dump();
+      }
+      // some requests seem to hang, despite all intentions
+      if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
+        LOG.warn("Aborting with " + activeThreads + " hung threads.");
+        return;
+      }
+
+    } while (activeThreads.get() > 0);
+    LOG.info("-activeThreads=" + activeThreads);
+  }
+}

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/fetcher/PartitionUrlByHost.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,23 @@
+package org.apache.nutch.fetcher;
+
+import java.net.URL;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+public class PartitionUrlByHost
+extends Partitioner<ImmutableBytesWritable, FetchEntry> {
+
+  @Override
+  public int getPartition(ImmutableBytesWritable key,
+      FetchEntry value, int numPartitions) {
+    String urlString = Bytes.toString(value.getKey().get());
+
+    URL url = null;
+
+    int hashCode = (url==null ? urlString : url.getHost()).hashCode();
+
+    return (hashCode & Integer.MAX_VALUE) % numPartitions;
+  }
+}

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/DeleteDuplicates.java Sun Aug 16 22:25:12 2009
@@ -31,7 +31,7 @@
 import org.apache.hadoop.util.*;
 
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.NutchJobConf;
 
 import org.apache.lucene.index.IndexReader;
 import org.apache.lucene.document.DateTools;
@@ -424,7 +424,7 @@
       new Path("dedup-urls-"+
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
-    JobConf job = new NutchJob(getConf());
+    JobConf job = new NutchJobConf(getConf());
 
     for (int i = 0; i < indexDirs.length; i++) {
       if (LOG.isInfoEnabled()) {
@@ -450,7 +450,7 @@
     Path outDir2 =
       new Path("dedup-hash-"+
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-    job = new NutchJob(getConf());
+    job = new NutchJobConf(getConf());
     job.setJobName("dedup 2: content by hash");
 
     FileInputFormat.addInputPath(job, outDir1);
@@ -472,7 +472,7 @@
     // remove outDir1 - no longer needed
     fs.delete(outDir1, true);
     
-    job = new NutchJob(getConf());
+    job = new NutchJobConf(getConf());
     job.setJobName("dedup 3: delete from index(es)");
 
     FileInputFormat.addInputPath(job, outDir2);

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/Indexer.java Sun Aug 16 22:25:12 2009
@@ -1,104 +1,161 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.nutch.indexer;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.ValueFilter;
+import org.apache.hadoop.hbase.filter.FilterList.Operator;
+import org.apache.hadoop.hbase.filter.ValueFilter.CompareOp;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.indexer.lucene.LuceneWriter;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.scoring.ScoringFilters;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
-
-/** Create indexes for segments. */
-public class Indexer extends Configured implements Tool {
+import org.apache.nutch.util.hbase.HbaseColumn;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.WebTableColumns;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class Indexer
+extends TableMapper<ImmutableBytesWritable, WebTableRow> 
+implements Tool {
 
   public static final String DONE_NAME = "index.done";
-
+  
   public static final Log LOG = LogFactory.getLog(Indexer.class);
 
-  public Indexer() {
-    super(null);
+  private static final Collection<HbaseColumn> COLUMNS = new HashSet<HbaseColumn>();
+  
+  public static final byte[] INDEX_MARK = Bytes.toBytes("__idxmrk__");
+  
+  private Configuration conf;
+
+  static {
+    COLUMNS.add(new HbaseColumn(WebTableColumns.SIGNATURE));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.PARSE_STATUS));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.SCORE));
+    COLUMNS.add(new HbaseColumn(WebTableColumns.METADATA, INDEX_MARK));
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+  
+  @Override
+  public void map(ImmutableBytesWritable key, Result result, Context context)
+  throws IOException, InterruptedException {
+    WebTableRow row = new WebTableRow(result);
+
+    ParseStatus pstatus = row.getParseStatus();
+    if (!pstatus.isSuccess() || 
+        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+      return;   // filter urls not parsed
+    }
+
+    context.write(key, row);
   }
 
-  public Indexer(Configuration conf) {
-    super(conf);
+  public static Collection<HbaseColumn> getColumns(Job job) {
+    Configuration conf = job.getConfiguration();
+    Collection<HbaseColumn> columns = new HashSet<HbaseColumn>(COLUMNS);
+    IndexingFilters filters = new IndexingFilters(conf);
+    columns.addAll(filters.getColumns());
+    ScoringFilters scoringFilters = new ScoringFilters(conf);
+    columns.addAll(scoringFilters.getColumns());
+    return columns;  
+  }
+  
+  public static Job createIndexJob(Configuration conf, String jobName,
+      String table, boolean reindex) throws IOException {
+    Job job = new NutchJob(conf, jobName);
+    Scan scan = TableUtil.createScanFromColumns(getColumns(job));
+    List<Filter> filters = new ArrayList<Filter>();
+    if (!reindex) {
+      filters.add(
+          new ValueFilter(WebTableColumns.METADATA, INDEX_MARK,
+              CompareOp.NOT_EQUAL, TableUtil.YES_VAL, false));
+    }
+    filters.add(
+        new ValueFilter(WebTableColumns.PARSE_STATUS, null,
+                        CompareOp.GREATER_OR_EQUAL, new byte[] { (byte)0 }, true));
+    FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, filters);
+    scan.setFilter(filterList);
+    TableMapReduceUtil.initTableMapperJob(table,
+        scan,
+        Indexer.class, ImmutableBytesWritable.class,
+        WebTableRow.class, job);
+
+    job.setReducerClass(IndexerReducer.class);
+    job.setOutputFormatClass(IndexerOutputFormat.class);
+    return job;
   }
 
-  public void index(Path luceneDir, Path crawlDb,
-                    Path linkDb, List<Path> segments)
-  throws IOException {
-    LOG.info("Indexer: starting");
+  private void index(Path indexDir, String table, boolean reindex) throws Exception {
+    LOG.info("IndexerHbase: starting");
+    LOG.info("IndexerHbase: table: " + table);
 
-    final JobConf job = new NutchJob(getConf());
-    job.setJobName("index-lucene " + luceneDir);
+    LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, getConf());
+    LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, getConf());
+    LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, getConf());
 
-    IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
+    NutchIndexWriterFactory.addClassToConf(getConf(), LuceneWriter.class);
 
-    FileOutputFormat.setOutputPath(job, luceneDir);
+    Job job = createIndexJob(getConf(), "index " + table, table, reindex); 
 
-    LuceneWriter.addFieldOptions("segment", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
-    LuceneWriter.addFieldOptions("digest", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
-    LuceneWriter.addFieldOptions("boost", LuceneWriter.STORE.YES, LuceneWriter.INDEX.NO, job);
+    FileOutputFormat.setOutputPath(job, indexDir);
 
-    NutchIndexWriterFactory.addClassToConf(job, LuceneWriter.class);
+    job.waitForCompletion(true);
+    FileOutputFormat.setOutputPath(job, indexDir);
 
-    JobClient.runJob(job);
-    LOG.info("Indexer: done");
+    job.waitForCompletion(true);
+    LOG.info("IndexerHbase: done");
   }
 
   public int run(String[] args) throws Exception {
-    if (args.length < 4) {
-      System.err.println("Usage: Indexer <index> <crawldb> <linkdb> <segment> ...");
-      return -1;
-    }
+    String usage = "Usage: IndexerHbase <index> <webtable> [-reindex]";
 
-    final Path luceneDir = new Path(args[0]);
-    final Path crawlDb = new Path(args[1]);
-    final Path linkDb = new Path(args[2]);
-
-    final List<Path> segments = new ArrayList<Path>();
-    for (int i = 3; i < args.length; i++) {
-      segments.add(new Path(args[i]));
+    if (args.length < 2) {
+      System.err.println(usage);
+      System.exit(-1);
     }
-
-    try {
-      index(luceneDir, crawlDb, linkDb, segments);
-      return 0;
-    } catch (final Exception e) {
-      LOG.fatal("Indexer: " + StringUtils.stringifyException(e));
-      return -1;
+    
+    boolean reindex = false;
+    if (args.length >= 3 && "-reindex".equals(args[2])) {
+      reindex = true;
     }
+
+    index(new Path(args[0]), args[1], reindex);
+    return 0;
   }
 
   public static void main(String[] args) throws Exception {
-    final int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new Indexer(), args);
     System.exit(res);
   }
+
 }

Modified: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=804789&r1=804788&r2=804789&view=diff
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Sun Aug 16 22:25:12 2009
@@ -18,41 +18,41 @@
 
 import java.io.IOException;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.nutch.indexer.NutchDocument;
 
-public class IndexerOutputFormat extends FileOutputFormat<Text, NutchDocument> {
+public class IndexerOutputFormat
+extends FileOutputFormat<WritableComparable<?>, NutchDocument> {
 
   @Override
-  public RecordWriter<Text, NutchDocument> getRecordWriter(FileSystem ignored,
-      JobConf job, String name, Progressable progress) throws IOException {
-    
-    // populate JobConf with field indexing options
-    IndexingFilters filters = new IndexingFilters(job);
-    
+  public RecordWriter<WritableComparable<?>, NutchDocument> getRecordWriter(
+      TaskAttemptContext job) throws IOException, InterruptedException {
+
     final NutchIndexWriter[] writers =
-      NutchIndexWriterFactory.getNutchIndexWriters(job);
+      NutchIndexWriterFactory.getNutchIndexWriters(job.getConfiguration());
 
     for (final NutchIndexWriter writer : writers) {
-      writer.open(job, name);
+      writer.open(job, FileOutputFormat.getUniqueFile(job, "part", ""));
     }
-    return new RecordWriter<Text, NutchDocument>() {
 
-      public void close(Reporter reporter) throws IOException {
+    return new RecordWriter<WritableComparable<?>, NutchDocument>() {
+
+      @Override
+      public void write(WritableComparable<?> key, NutchDocument doc) throws IOException {
         for (final NutchIndexWriter writer : writers) {
-          writer.close();
+          writer.write(doc);
         }
       }
 
-      public void write(Text key, NutchDocument doc) throws IOException {
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+      InterruptedException {
         for (final NutchIndexWriter writer : writers) {
-          writer.write(doc);
-        }
+          writer.close();
+        }      
       }
     };
   }

Added: lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java?rev=804789&view=auto
==============================================================================
--- lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java (added)
+++ lucene/nutch/branches/nutchbase/src/java/org/apache/nutch/indexer/IndexerReducer.java Sun Aug 16 22:25:12 2009
@@ -0,0 +1,87 @@
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.hbase.WebTableRow;
+import org.apache.nutch.util.hbase.TableUtil;
+
+public class IndexerReducer
+extends Reducer<ImmutableBytesWritable, WebTableRow, ImmutableBytesWritable, NutchDocument> {
+
+  public static final Log LOG = Indexer.LOG;
+  
+  private IndexingFilters filters;
+  
+  private ScoringFilters scoringFilters;
+  
+  private HTable table;
+  
+  @Override
+  protected void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    filters = new IndexingFilters(conf);
+    table = new HTable(conf.get(TableInputFormat.INPUT_TABLE));
+    scoringFilters = new ScoringFilters(conf);
+  }
+  
+  @Override
+  protected void reduce(ImmutableBytesWritable key, Iterable<WebTableRow> values,
+      Context context) throws IOException, InterruptedException {
+    WebTableRow row = values.iterator().next();
+    NutchDocument doc = new NutchDocument();
+
+    doc.add("id", Bytes.toString(key.get()));
+    doc.add("digest", StringUtil.toHexString(row.getSignature()));
+
+    String url = TableUtil.unreverseUrl(Bytes.toString(key.get()));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Indexing URL: " + url);
+    }
+
+    try {
+      doc = filters.filter(doc, url, row);
+    } catch (IndexingException e) {
+      LOG.warn("Error indexing "+key+": "+e);
+      return;
+    }
+
+    // skip documents discarded by indexing filters
+    if (doc == null) return;
+
+    float boost = 1.0f;
+    // run scoring filters
+    try {
+      boost = scoringFilters.indexerScore(url, doc, row, boost);
+    } catch (final ScoringFilterException e) {
+      LOG.warn("Error calculating score " + key + ": " + e);
+      return;
+    }
+
+    doc.setScore(boost);
+    // store boost for use by explain and dedup
+    doc.add("boost", Float.toString(boost));
+
+    row.putMeta(Indexer.INDEX_MARK, TableUtil.YES_VAL);
+    row.makeRowMutation().commit(table);
+    context.write(key, doc);
+  }
+  
+  @Override
+  public void cleanup(Context context) throws IOException {
+    table.close();
+  }
+
+}