You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/12/28 01:03:05 UTC

svn commit: r490607 - in /lucene/nutch/trunk: ./ conf/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/metadata/ src/java/org/apache/nutch/segment/ src/java/org/apache/nut...

Author: ab
Date: Wed Dec 27 16:03:04 2006
New Revision: 490607

URL: http://svn.apache.org/viewvc?view=rev&rev=490607
Log:
This patch addresses several issues:

* NUTCH-415 - Generator should mark selected records in CrawlDb.
  Due to increased resource consumption this step is optional.
  Application-level locking has been added to prevent concurrent
  modification of databases.

* NUTCH-416 - CrawlDatum status and CrawlDbReducer refactoring. It is
  now possible to correctly update CrawlDb from multiple segments.
  Introduce new status codes for temporary and permanent
  redirection.

* NUTCH-322 - Fix Fetcher to store redirected pages and to store
  protocol-level status. This also should fix NUTCH-273.

* Change default Fetcher behavior not to follow redirects immediately.
  Instead Fetcher will record redirects as new pages to be added to CrawlDb.
  This also partially addresses NUTCH-273.

* Detect and report when Generator creates 0-sized segments.

* Fix Injector to preserve already existing CrawlDatum if the seed list
  being injected also contains such URL.

This development was partially supported by SiteSell Inc.


Added:
    lucene/nutch/trunk/src/java/org/apache/nutch/util/LockUtil.java   (with props)
Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/conf/nutch-default.xml
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDb.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
    lucene/nutch/trunk/src/java/org/apache/nutch/crawl/MapWritable.java
    lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
    lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java
    lucene/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
    lucene/nutch/trunk/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java
    lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java
    lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Wed Dec 27 16:03:04 2006
@@ -92,6 +92,29 @@
     
 30. NUTCH-406 - Metadata tries to write null values (mattmann)
 
+31. NUTCH-415 - Generator should mark selected records in CrawlDb. 
+    Due to increased resource consumption this step is optional. 
+    Application-level locking has been added to prevent concurrent
+    modification of databases. (ab)
+
+32. NUTCH-416 - CrawlDatum status and CrawlDbReducer refactoring. It is
+    now possible to correctly update CrawlDb from multiple segments.
+    Introduce new status codes for temporary and permanent
+    redirection. (ab)
+
+33. NUTCH-322 - Fix Fetcher to store redirected pages and to store
+    protocol-level status. This also should fix NUTCH-273. (ab)
+
+34. Change default Fetcher behavior not to follow redirects immediately.
+    Instead Fetcher will record redirects as new pages to be added to CrawlDb.
+    This also partially addresses NUTCH-273. (ab)
+
+35. Detect and report when Generator creates 0-sized segments. (ab)
+
+36. Fix Injector to preserve already existing CrawlDatum if the seed list
+    being injected also contains such URL. (ab)
+
+
 Release 0.8 - 2006-07-25
 
  0. Totally new architecture, based on hadoop

Modified: lucene/nutch/trunk/conf/nutch-default.xml
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/conf/nutch-default.xml?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/conf/nutch-default.xml (original)
+++ lucene/nutch/trunk/conf/nutch-default.xml Wed Dec 27 16:03:04 2006
@@ -147,9 +147,11 @@
 
 <property>
   <name>http.redirect.max</name>
-  <value>3</value>
+  <value>0</value>
   <description>The maximum number of redirects the fetcher will follow when
-    trying to fetch a page.</description>
+  trying to fetch a page. If set to negative or 0, fetcher won't immediately
+  follow redirected URLs, instead it will record them for later fetching.
+  </description>
 </property>
 
 <property>
@@ -377,6 +379,17 @@
   remote DNS servers, not to mention increased external traffic
   and latency. For these reasons when using this option it is
   required that a local caching DNS be used.</description>
+</property>
+
+<property>
+  <name>generate.update.crawldb</name>
+  <value>false</value>
+  <description>For highly-concurrent environments, where several
+  generate/fetch/update cycles may overlap, setting this to true ensures
+  that generate will create different fetchlists even without intervening
+  updatedb-s, at the cost of running an additional job to update CrawlDB.
+  If false, running generate twice without intervening
+  updatedb will generate identical fetchlists.</description>
 </property>
 
 <!-- fetcher properties -->

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Crawl.java Wed Dec 27 16:03:04 2006
@@ -116,15 +116,15 @@
       
     for (int i = 0; i < depth; i++) {             // generate new segment
       Path segment = generator.generate(crawlDb, segments, -1, topN, System
-          .currentTimeMillis(), false);
+          .currentTimeMillis(), false, false);
       fetcher.fetch(segment, threads);  // fetch it
       if (!Fetcher.isParsing(job)) {
         parseSegment.parse(segment);    // parse it, if needed
       }
-      crawlDbTool.update(crawlDb, segment, true, true); // update crawldb
+      crawlDbTool.update(crawlDb, new Path[]{segment}, true, true); // update crawldb
     }
       
-    linkDbTool.invert(linkDb, segments, true, true); // invert links
+    linkDbTool.invert(linkDb, segments, true, true, false); // invert links
 
     // index, dedup & merge
     indexer.index(indexes, crawlDb, linkDb, fs.listPaths(segments));

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDatum.java Wed Dec 27 16:03:04 2006
@@ -25,34 +25,86 @@
 
 /* The crawl state of a url. */
 public class CrawlDatum implements WritableComparable, Cloneable {
-  public static final String DB_DIR_NAME = "current";
-
   public static final String GENERATE_DIR_NAME = "crawl_generate";
   public static final String FETCH_DIR_NAME = "crawl_fetch";
   public static final String PARSE_DIR_NAME = "crawl_parse";
 
-  private final static byte CUR_VERSION = 4;
-
-  public static final byte STATUS_SIGNATURE = 0;
-  public static final byte STATUS_DB_UNFETCHED = 1;
-  public static final byte STATUS_DB_FETCHED = 2;
-  public static final byte STATUS_DB_GONE = 3;
-  public static final byte STATUS_LINKED = 4;
-  public static final byte STATUS_FETCH_SUCCESS = 5;
-  public static final byte STATUS_FETCH_RETRY = 6;
-  public static final byte STATUS_FETCH_GONE = 7;
-  
-  public static final String[] statNames = {
-    "signature",
-    "DB_unfetched",
-    "DB_fetched",
-    "DB_gone",
-    "linked",
-    "fetch_success",
-    "fetch_retry",
-    "fetch_gone"
-  };
+  private final static byte CUR_VERSION = 5;
 
+  /** Compatibility values for on-the-fly conversion from versions < 5. */
+  private static final byte OLD_STATUS_SIGNATURE = 0;
+  private static final byte OLD_STATUS_DB_UNFETCHED = 1;
+  private static final byte OLD_STATUS_DB_FETCHED = 2;
+  private static final byte OLD_STATUS_DB_GONE = 3;
+  private static final byte OLD_STATUS_LINKED = 4;
+  private static final byte OLD_STATUS_FETCH_SUCCESS = 5;
+  private static final byte OLD_STATUS_FETCH_RETRY = 6;
+  private static final byte OLD_STATUS_FETCH_GONE = 7;
+  
+  private static HashMap<Byte, Byte> oldToNew = new HashMap<Byte, Byte>();
+  
+  /** Page was not fetched yet. */
+  public static final byte STATUS_DB_UNFETCHED      = 0x01;
+  /** Page was successfully fetched. */
+  public static final byte STATUS_DB_FETCHED        = 0x02;
+  /** Page no longer exists. */
+  public static final byte STATUS_DB_GONE           = 0x03;
+  /** Page temporarily redirects to other page. */
+  public static final byte STATUS_DB_REDIR_TEMP     = 0x04;
+  /** Page permanently redirects to other page. */
+  public static final byte STATUS_DB_REDIR_PERM     = 0x05;
+  
+  /** Maximum value of DB-related status. */
+  public static final byte STATUS_DB_MAX            = 0x1f;
+  
+  /** Fetching was successful. */
+  public static final byte STATUS_FETCH_SUCCESS     = 0x21;
+  /** Fetching unsuccessful, needs to be retried (transient errors). */
+  public static final byte STATUS_FETCH_RETRY       = 0x22;
+  /** Fetching temporarily redirected to other page. */
+  public static final byte STATUS_FETCH_REDIR_TEMP  = 0x23;
+  /** Fetching permanently redirected to other page. */
+  public static final byte STATUS_FETCH_REDIR_PERM  = 0x24;
+  /** Fetching unsuccessful - page is gone. */
+  public static final byte STATUS_FETCH_GONE        = 0x25;
+  
+  /** Maximum value of fetch-related status. */
+  public static final byte STATUS_FETCH_MAX         = 0x3f;
+  
+  /** Page signature. */
+  public static final byte STATUS_SIGNATURE         = 0x41;
+  /** Page was newly injected. */
+  public static final byte STATUS_INJECTED          = 0x42;
+  /** Page discovered through a link. */
+  public static final byte STATUS_LINKED            = 0x43;
+  
+  
+  public static final HashMap<Byte, String> statNames = new HashMap<Byte, String>();
+  static {
+    statNames.put(STATUS_DB_UNFETCHED, "db_unfetched");
+    statNames.put(STATUS_DB_FETCHED, "db_fetched");
+    statNames.put(STATUS_DB_GONE, "db_gone");
+    statNames.put(STATUS_DB_REDIR_TEMP, "db_redir_temp");
+    statNames.put(STATUS_DB_REDIR_PERM, "db_redir_perm");
+    statNames.put(STATUS_SIGNATURE, "signature");
+    statNames.put(STATUS_INJECTED, "injected");
+    statNames.put(STATUS_LINKED, "linked");
+    statNames.put(STATUS_FETCH_SUCCESS, "fetch_success");
+    statNames.put(STATUS_FETCH_RETRY, "fetch_retry");
+    statNames.put(STATUS_FETCH_REDIR_TEMP, "fetch_redir_temp");
+    statNames.put(STATUS_FETCH_REDIR_PERM, "fetch_redir_perm");
+    statNames.put(STATUS_FETCH_GONE, "fetch_gone");
+    
+    oldToNew.put(OLD_STATUS_DB_UNFETCHED, STATUS_DB_UNFETCHED);
+    oldToNew.put(OLD_STATUS_DB_FETCHED, STATUS_DB_FETCHED);
+    oldToNew.put(OLD_STATUS_DB_GONE, STATUS_DB_GONE);
+    oldToNew.put(OLD_STATUS_FETCH_GONE, STATUS_FETCH_GONE);
+    oldToNew.put(OLD_STATUS_FETCH_SUCCESS, STATUS_FETCH_SUCCESS);
+    oldToNew.put(OLD_STATUS_FETCH_RETRY, STATUS_FETCH_RETRY);
+    oldToNew.put(OLD_STATUS_LINKED, STATUS_LINKED);
+    oldToNew.put(OLD_STATUS_SIGNATURE, STATUS_SIGNATURE);
+  }
+  
   private static final float MILLISECONDS_PER_DAY = 24 * 60 * 60 * 1000;
 
   private byte status;
@@ -63,6 +115,16 @@
   private byte[] signature = null;
   private long modifiedTime;
   private MapWritable metaData;
+  
+  public static boolean hasDbStatus(CrawlDatum datum) {
+    if (datum.status <= STATUS_DB_MAX) return true;
+    return false;
+  }
+
+  public static boolean hasFetchStatus(CrawlDatum datum) {
+    if (datum.status > STATUS_DB_MAX && datum.status <= STATUS_FETCH_MAX) return true;
+    return false;
+  }
 
   public CrawlDatum() {}
 
@@ -81,6 +143,13 @@
   //
 
   public byte getStatus() { return status; }
+  
+  public static String getStatusName(byte value) {
+    String res = statNames.get(value);
+    if (res == null) res = "unknown";
+    return res;
+  }
+  
   public void setStatus(int status) { this.status = (byte)status; }
 
   public long getFetchTime() { return fetchTime; }
@@ -174,6 +243,14 @@
         }
       }
     }
+    // translate status codes
+    if (version < 5) {
+      if (oldToNew.containsKey(status))
+        status = oldToNew.get(status);
+      else
+        status = STATUS_DB_UNFETCHED;
+      
+    }
   }
 
   /** The number of bytes into a CrawlDatum that the score is stored. */
@@ -285,7 +362,7 @@
   public String toString() {
     StringBuffer buf = new StringBuffer();
     buf.append("Version: " + CUR_VERSION + "\n");
-    buf.append("Status: " + getStatus() + " (" + statNames[getStatus()] + ")\n");
+    buf.append("Status: " + getStatus() + " (" + getStatusName(getStatus()) + ")\n");
     buf.append("Fetch time: " + new Date(getFetchTime()) + "\n");
     buf.append("Modified time: " + new Date(getModifiedTime()) + "\n");
     buf.append("Retries since fetch: " + getRetriesSinceFetch() + "\n");

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDb.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDb.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDb.java Wed Dec 27 16:03:04 2006
@@ -31,6 +31,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolBase;
 
+import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -39,9 +40,14 @@
  * crawldb accordingly.
  */
 public class CrawlDb extends ToolBase {
+  public static final Log LOG = LogFactory.getLog(CrawlDb.class);
+
   public static final String CRAWLDB_ADDITIONS_ALLOWED = "db.update.additions.allowed";
 
-  public static final Log LOG = LogFactory.getLog(CrawlDb.class);
+  public static final String CURRENT_NAME = "current";
+  
+  public static final String LOCK_NAME = ".locked";
+
   
   public CrawlDb() {
     
@@ -51,17 +57,19 @@
     setConf(conf);
   }
 
-  public void update(Path crawlDb, Path segment, boolean normalize, boolean filter) throws IOException {
+  public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter) throws IOException {
     boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);    
-    update(crawlDb, segment, normalize, filter, additionsAllowed);
+    update(crawlDb, segments, normalize, filter, additionsAllowed, false);
   }
   
-  public void update(Path crawlDb, Path segment, boolean normalize, boolean filter, boolean additionsAllowed) throws IOException {
-    
+  public void update(Path crawlDb, Path[] segments, boolean normalize, boolean filter, boolean additionsAllowed, boolean force) throws IOException {
+    FileSystem fs = FileSystem.get(getConf());
+    Path lock = new Path(crawlDb, LOCK_NAME);
+    LockUtil.createLockFile(fs, lock, force);
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb update: starting");
       LOG.info("CrawlDb update: db: " + crawlDb);
-      LOG.info("CrawlDb update: segment: " + segment);
+      LOG.info("CrawlDb update: segments: " + Arrays.asList(segments));
       LOG.info("CrawlDb update: additions allowed: " + additionsAllowed);
       LOG.info("CrawlDb update: URL normalizing: " + normalize);
       LOG.info("CrawlDb update: URL filtering: " + filter);
@@ -71,13 +79,27 @@
     job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);
     job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);
     job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);
-    job.addInputPath(new Path(segment, CrawlDatum.FETCH_DIR_NAME));
-    job.addInputPath(new Path(segment, CrawlDatum.PARSE_DIR_NAME));
+    for (int i = 0; i < segments.length; i++) {
+      Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
+      Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME);
+      if (fs.exists(fetch) && fs.exists(parse)) {
+        job.addInputPath(fetch);
+        job.addInputPath(parse);
+      } else {
+        LOG.info(" - skipping invalid segment " + segments[i]);
+      }
+    }
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb update: Merging segment data into db.");
     }
-    JobClient.runJob(job);
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      if (fs.exists(job.getOutputPath())) fs.delete(job.getOutputPath());
+      throw e;
+    }
 
     CrawlDb.install(job, crawlDb);
     if (LOG.isInfoEnabled()) { LOG.info("CrawlDb update: done"); }
@@ -93,7 +115,7 @@
     job.setJobName("crawldb " + crawlDb);
 
 
-    Path current = new Path(crawlDb, CrawlDatum.DB_DIR_NAME);
+    Path current = new Path(crawlDb, CURRENT_NAME);
     if (FileSystem.get(job).exists(current)) {
       job.addInputPath(current);
     }
@@ -114,7 +136,7 @@
     Path newCrawlDb = job.getOutputPath();
     FileSystem fs = new JobClient(job).getFs();
     Path old = new Path(crawlDb, "old");
-    Path current = new Path(crawlDb, CrawlDatum.DB_DIR_NAME);
+    Path current = new Path(crawlDb, CURRENT_NAME);
     if (fs.exists(current)) {
       if (fs.exists(old)) fs.delete(old);
       fs.rename(current, old);
@@ -122,6 +144,8 @@
     fs.mkdirs(crawlDb);
     fs.rename(newCrawlDb, current);
     if (fs.exists(old)) fs.delete(old);
+    Path lock = new Path(crawlDb, LOCK_NAME);
+    LockUtil.removeLockFile(fs, lock);
   }
 
   public static void main(String[] args) throws Exception {
@@ -131,9 +155,11 @@
 
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: CrawlDb <crawldb> <segment> [-normalize] [-filter] [-noAdditions]");
+      System.err.println("Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]");
       System.err.println("\tcrawldb\tCrawlDb to update");
-      System.err.println("\tsegment\tsegment name to update from");
+      System.err.println("\t-dir segments\tparent directory containing all segments to update from");
+      System.err.println("\tseg1 seg2 ...\tlist of segment names to update from");
+      System.err.println("\t-force\tforce update even if CrawlDb appears to be locked (CAUTION advised)");
       System.err.println("\t-normalize\tuse URLNormalizer on urls in CrawlDb and segment (usually not needed)");
       System.err.println("\t-filter\tuse URLFilters on urls in CrawlDb and segment");
       System.err.println("\t-noAdditions\tonly update already existing URLs, don't add any newly discovered URLs");
@@ -141,20 +167,36 @@
     }
     boolean normalize = false;
     boolean filter = false;
+    boolean force = false;
+    final FileSystem fs = FileSystem.get(getConf());
     boolean additionsAllowed = getConf().getBoolean(CRAWLDB_ADDITIONS_ALLOWED, true);
-    if (args.length > 2) {
-      for (int i = 2; i < args.length; i++) {
-        if (args[i].equals("-normalize")) {
-          normalize = true;
-        } else if (args[i].equals("-filter")) {
-          filter = true;
-        } else if (args[i].equals("-noAdditions")) {
-          additionsAllowed = false;
-        }
+    HashSet<Path> dirs = new HashSet<Path>();
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-normalize")) {
+        normalize = true;
+      } else if (args[i].equals("-filter")) {
+        filter = true;
+      } else if (args[i].equals("-force")) {
+        force = true;
+      } else if (args[i].equals("-noAdditions")) {
+        additionsAllowed = false;
+      } else if (args[i].equals("-dir")) {
+        Path[] paths = fs.listPaths(new Path(args[++i]), new PathFilter() {
+          public boolean accept(Path dir) {
+            try {
+              return fs.isDirectory(dir);
+            } catch (IOException ioe) {
+              return false;
+            }
+          }
+        });
+        dirs.addAll(Arrays.asList(paths));
+      } else {
+        dirs.add(new Path(args[i]));
       }
     }
     try {
-      update(new Path(args[0]), new Path(args[1]), normalize, filter, additionsAllowed);
+      update(new Path(args[0]), dirs.toArray(new Path[dirs.size()]), normalize, filter, additionsAllowed, force);
       return 0;
     } catch (Exception e) {
       LOG.fatal("CrawlDb update: " + StringUtils.stringifyException(e));

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbMerger.java Wed Dec 27 16:03:04 2006
@@ -105,12 +105,12 @@
   public void merge(Path output, Path[] dbs, boolean normalize, boolean filter) throws Exception {
     JobConf job = createMergeJob(getConf(), output, normalize, filter);
     for (int i = 0; i < dbs.length; i++) {
-      job.addInputPath(new Path(dbs[i], CrawlDatum.DB_DIR_NAME));
+      job.addInputPath(new Path(dbs[i], CrawlDb.CURRENT_NAME));
     }
     JobClient.runJob(job);
     FileSystem fs = FileSystem.get(getConf());
     fs.mkdirs(output);
-    fs.rename(job.getOutputPath(), new Path(output, CrawlDatum.DB_DIR_NAME));
+    fs.rename(job.getOutputPath(), new Path(output, CrawlDb.CURRENT_NAME));
   }
 
   public static JobConf createMergeJob(Configuration conf, Path output, boolean normalize, boolean filter) {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReader.java Wed Dec 27 16:03:04 2006
@@ -68,7 +68,7 @@
   private void openReaders(String crawlDb, Configuration config) throws IOException {
     if (readers != null) return;
     FileSystem fs = FileSystem.get(config);
-    readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDatum.DB_DIR_NAME), config);
+    readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDb.CURRENT_NAME), config);
   }
   
   private void closeReaders() {
@@ -243,7 +243,7 @@
     JobConf job = new NutchJob(config);
     job.setJobName("stats " + crawlDb);
 
-    job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME));
+    job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(CrawlDbStatMapper.class);
@@ -301,10 +301,10 @@
         } else if (k.equals("scx")) {
           LOG.info("max score:\t" + (float) (val.get() / 1000.0f));
         } else if (k.equals("sct")) {
-          LOG.info("avg score:\t" + (float) ((float) (val.get() / totalCnt.get()) / 1000.0f));
+          LOG.info("avg score:\t" + (float) ((double) (val.get() / totalCnt.get()) / 1000.0));
         } else if (k.startsWith("status")) {
           int code = Integer.parseInt(k.substring(k.indexOf(' ') + 1));
-          LOG.info(k + " (" + CrawlDatum.statNames[code] + "):\t" + val);
+          LOG.info(k + " (" + CrawlDatum.getStatusName((byte)code) + "):\t" + val);
         } else LOG.info(k + ":\t" + val);
       }
     }
@@ -344,7 +344,7 @@
     JobConf job = new NutchJob(config);
     job.setJobName("dump " + crawlDb);
 
-    job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME));
+    job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setInputKeyClass(Text.class);
     job.setInputValueClass(CrawlDatum.class);
@@ -373,10 +373,8 @@
 
     JobConf job = new NutchJob(config);
     job.setJobName("topN prepare " + crawlDb);
-    job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME));
+    job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
-    job.setInputKeyClass(Text.class);
-    job.setInputValueClass(CrawlDatum.class);
     job.setMapperClass(CrawlDbTopNMapper.class);
     job.setReducerClass(IdentityReducer.class);
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/CrawlDbReducer.java Wed Dec 27 16:03:04 2006
@@ -27,12 +27,14 @@
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
 
 /** Merge new page entries with existing entries. */
 public class CrawlDbReducer implements Reducer {
   public static final Log LOG = LogFactory.getLog(CrawlDbReducer.class);
+  
   private int retryMax;
   private CrawlDatum result = new CrawlDatum();
   private ArrayList linked = new ArrayList();
@@ -51,60 +53,81 @@
                      OutputCollector output, Reporter reporter)
     throws IOException {
 
-    CrawlDatum highest = null;
+    CrawlDatum fetch = null;
     CrawlDatum old = null;
     byte[] signature = null;
     linked.clear();
 
     while (values.hasNext()) {
       CrawlDatum datum = (CrawlDatum)values.next();
+      if (CrawlDatum.hasDbStatus(datum)) {
+        if (old == null) {
+          old = datum;
+        } else {
+          // always take the latest version
+          if (old.getFetchTime() < datum.getFetchTime()) old = datum;
+        }
+        continue;
+      }
 
-      if (highest == null || datum.getStatus() > highest.getStatus()) {
-        highest = datum;                          // find highest status
+      if (CrawlDatum.hasFetchStatus(datum)) {
+        if (fetch == null) {
+          fetch = datum;
+        } else {
+          // always take the latest version
+          if (fetch.getFetchTime() < datum.getFetchTime()) fetch = datum;
+        }
+        continue;
       }
 
-      switch (datum.getStatus()) {                // find old entry, if any
-      case CrawlDatum.STATUS_DB_UNFETCHED:
-      case CrawlDatum.STATUS_DB_FETCHED:
-      case CrawlDatum.STATUS_DB_GONE:
-        old = datum;
-        break;
+      switch (datum.getStatus()) {                // collect other info
       case CrawlDatum.STATUS_LINKED:
         linked.add(datum);
         break;
       case CrawlDatum.STATUS_SIGNATURE:
         signature = datum.getSignature();
+        break;
+      default:
+        LOG.warn("Unknown status, key: " + key + ", datum: " + datum);
       }
     }
 
     // if it doesn't already exist, skip it
     if (old == null && !additionsAllowed) return;
     
-    // initialize with the latest version
-    result.set(highest);
+    // if there is no fetched datum, perhaps there is a link
+    if (fetch == null && linked.size() > 0) {
+      fetch = (CrawlDatum)linked.get(0);
+    }
+    
+    // still no new data - record only unchanged old data, if exists, and return
+    if (fetch == null) {
+      if (old != null) // at this point at least "old" should be present
+        output.collect(key, old);
+      else
+        LOG.warn("Missing fetch and old value, signature=" + signature);
+      return;
+    }
+    
+    // initialize with the latest version, be it fetch or link
+    result.set(fetch);
     if (old != null) {
       // copy metadata from old, if exists
       if (old.getMetaData().size() > 0) {
         result.getMetaData().putAll(old.getMetaData());
         // overlay with new, if any
-        if (highest.getMetaData().size() > 0)
-          result.getMetaData().putAll(highest.getMetaData());
+        if (fetch.getMetaData().size() > 0)
+          result.getMetaData().putAll(fetch.getMetaData());
       }
       // set the most recent valid value of modifiedTime
-      if (old.getModifiedTime() > 0 && highest.getModifiedTime() == 0) {
+      if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) {
         result.setModifiedTime(old.getModifiedTime());
       }
     }
+    
+    switch (fetch.getStatus()) {                // determine new status
 
-    switch (highest.getStatus()) {                // determine new status
-
-    case CrawlDatum.STATUS_DB_UNFETCHED:          // no new entry
-    case CrawlDatum.STATUS_DB_FETCHED:
-    case CrawlDatum.STATUS_DB_GONE:
-      result.set(old);                            // use old
-      break;
-
-    case CrawlDatum.STATUS_LINKED:                // highest was link
+    case CrawlDatum.STATUS_LINKED:                // it was link
       if (old != null) {                          // if old exists
         result.set(old);                          // use it
       } else {
@@ -122,11 +145,21 @@
       break;
       
     case CrawlDatum.STATUS_FETCH_SUCCESS:         // succesful fetch
-      if (highest.getSignature() == null) result.setSignature(signature);
+      if (fetch.getSignature() == null) result.setSignature(signature);
       result.setStatus(CrawlDatum.STATUS_DB_FETCHED);
       result.setNextFetchTime();
       break;
 
+    case CrawlDatum.STATUS_FETCH_REDIR_TEMP:
+      if (fetch.getSignature() == null) result.setSignature(signature);
+      result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);
+      result.setNextFetchTime();
+      break;
+    case CrawlDatum.STATUS_FETCH_REDIR_PERM:
+      if (fetch.getSignature() == null) result.setSignature(signature);
+      result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM);
+      result.setNextFetchTime();
+      break;
     case CrawlDatum.STATUS_SIGNATURE:
       if (LOG.isWarnEnabled()) {
         LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);
@@ -135,7 +168,7 @@
     case CrawlDatum.STATUS_FETCH_RETRY:           // temporary failure
       if (old != null)
         result.setSignature(old.getSignature());  // use old signature
-      if (highest.getRetriesSinceFetch() < retryMax) {
+      if (fetch.getRetriesSinceFetch() < retryMax) {
         result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
       } else {
         result.setStatus(CrawlDatum.STATUS_DB_GONE);
@@ -149,7 +182,7 @@
       break;
 
     default:
-      throw new RuntimeException("Unknown status: " + highest.getStatus() + " " + key);
+      throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key);
     }
 
     try {
@@ -159,6 +192,8 @@
         LOG.warn("Couldn't update score, key=" + key + ": " + e);
       }
     }
+    // remove generation time, if any
+    result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);
     output.collect(key, result);
   }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Generator.java Wed Dec 27 16:03:04 2006
@@ -31,13 +31,16 @@
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolBase;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.nutch.metadata.Nutch;
 import org.apache.nutch.net.URLFilterException;
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -47,8 +50,10 @@
   public static final String CRAWL_GENERATE_FILTER = "crawl.generate.filter";
   public static final String GENERATE_MAX_PER_HOST_BY_IP = "generate.max.per.host.by.ip";
   public static final String GENERATE_MAX_PER_HOST = "generate.max.per.host";
+  public static final String GENERATE_UPDATE_CRAWLDB = "generate.update.crawldb";
   public static final String CRAWL_TOP_N = "crawl.topN";
   public static final String CRAWL_GEN_CUR_TIME = "crawl.gen.curTime";
+  public static final String CRAWL_GEN_DELAY = "crawl.gen.delay";
   public static final Log LOG = LogFactory.getLog(Generator.class);
   
   public static class SelectorEntry implements Writable {
@@ -77,6 +82,7 @@
 
   /** Selects entries due for fetch. */
   public static class Selector implements Mapper, Partitioner, Reducer {
+    private LongWritable genTime = new LongWritable(System.currentTimeMillis());
     private long curTime;
     private long limit;
     private long count;
@@ -91,6 +97,8 @@
     private boolean byIP;
     private long dnsFailure = 0L;
     private boolean filter;
+    private long genDelay;
+    private boolean runUpdatedb;
 
     public void configure(JobConf job) {
       curTime = job.getLong(CRAWL_GEN_CUR_TIME, System.currentTimeMillis());
@@ -102,6 +110,10 @@
       scfilters = new ScoringFilters(job);
       hostPartitioner.configure(job);
       filter = job.getBoolean(CRAWL_GENERATE_FILTER, true);
+      genDelay = job.getLong(CRAWL_GEN_DELAY, 7L) * 3600L * 24L * 1000L;
+      long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
+      if (time > 0) genTime.set(time);
+      runUpdatedb = job.getBoolean(GENERATE_UPDATE_CRAWLDB, false);
     }
 
     public void close() {}
@@ -125,12 +137,18 @@
       }
       CrawlDatum crawlDatum = (CrawlDatum)value;
 
-      if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE)
+      if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_GONE ||
+          crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM)
         return;                                   // don't retry
 
       if (crawlDatum.getFetchTime() > curTime)
         return;                                   // not time yet
 
+      LongWritable oldGenTime = (LongWritable)crawlDatum.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
+      if (oldGenTime != null) { // awaiting fetch & update
+        if (oldGenTime.get() + genDelay > curTime) // still wait for update
+          return;
+      }
       float sort = 1.0f;
       try {
         sort = scfilters.generatorSortValue((Text)key, crawlDatum, sort);
@@ -141,6 +159,8 @@
       }
       // sort by decreasing score, using DecreasingFloatComparator
       sortValue.set(sort);
+      // record generation time
+      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
       entry.datum = crawlDatum;
       entry.url = (Text)key;
       output.collect(sortValue, entry);          // invert for sort by score
@@ -247,7 +267,7 @@
     public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
       SelectorEntry entry = (SelectorEntry)value;
       output.collect(entry.url, entry.datum);
-    }    
+    }
   }
   
   /** Sort fetch lists by hash of URL. */
@@ -286,6 +306,38 @@
     }
   }
 
+  /**
+   * Update the CrawlDB so that the next generate won't include the same URLs.
+   */
+  public static class CrawlDbUpdater extends MapReduceBase implements Mapper, Reducer {
+    public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {
+      if (key instanceof FloatWritable) { // tempDir source
+        SelectorEntry se = (SelectorEntry)value;
+        output.collect(se.url, se.datum);
+      } else {
+        output.collect(key, value);
+      }
+    }
+
+    public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
+      CrawlDatum orig = null;
+      LongWritable genTime = null;
+      while (values.hasNext()) {
+        CrawlDatum val = (CrawlDatum)values.next();
+        if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {
+          genTime = (LongWritable)val.getMetaData().get(Nutch.WRITABLE_GENERATE_TIME_KEY);
+        } else {
+          orig = val;
+        }
+      }
+      if (genTime != null) {
+        orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
+      }
+      output.collect(key, orig);
+    }
+    
+  }
+  
   public Generator() {
     
   }
@@ -298,21 +350,25 @@
   public Path generate(Path dbDir, Path segments)
     throws IOException {
     return generate(dbDir, segments, -1, Long.MAX_VALUE, System
-        .currentTimeMillis(), true);
+        .currentTimeMillis(), true, false);
   }
 
   /** Generate fetchlists in a segment. */
   public Path generate(Path dbDir, Path segments,
-                       int numLists, long topN, long curTime, boolean filter)
+                       int numLists, long topN, long curTime, boolean filter,
+                       boolean force)
     throws IOException {
 
     Path tempDir =
       new Path(getConf().get("mapred.temp.dir", ".") +
-               "/generate-temp-"+
-               Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+               "/generate-temp-"+ System.currentTimeMillis());
 
     Path segment = new Path(segments, generateSegmentName());
     Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME);
+    
+    Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
+    FileSystem fs = FileSystem.get(getConf());
+    LockUtil.createLockFile(fs, lock, force);
 
     LOG.info("Generator: Selecting best-scoring urls due for fetch.");
     LOG.info("Generator: starting");
@@ -322,7 +378,7 @@
       LOG.info("Generator: topN: " + topN);
     }
 
-    // map to inverted subset due for fetch, sort by link count
+    // map to inverted subset due for fetch, sort by score
     JobConf job = new NutchJob(getConf());
     job.setJobName("generate: select " + segment);
 
@@ -335,10 +391,12 @@
       numLists = 1;
     }
     job.setLong(CRAWL_GEN_CUR_TIME, curTime);
+    // record real generation time
+    job.setLong(Nutch.GENERATE_TIME_KEY, System.currentTimeMillis());
     job.setLong(CRAWL_TOP_N, topN);
     job.setBoolean(CRAWL_GENERATE_FILTER, filter);
 
-    job.setInputPath(new Path(dbDir, CrawlDatum.DB_DIR_NAME));
+    job.setInputPath(new Path(dbDir, CrawlDb.CURRENT_NAME));
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(Selector.class);
@@ -350,7 +408,22 @@
     job.setOutputKeyClass(FloatWritable.class);
     job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
     job.setOutputValueClass(SelectorEntry.class);
-    JobClient.runJob(job);
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
+    
+    // check that we selected at least some entries ...
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(job, tempDir);
+    if (readers == null || readers.length == 0 || !readers[0].next(new FloatWritable())) {
+      LOG.warn("Generator: 0 records selected for fetching, exiting ...");
+      LockUtil.removeLockFile(fs, lock);
+      fs.delete(tempDir);
+      return null;
+    }
+    for (int i = 0; i < readers.length; i++) readers[i].close();
 
     // invert again, paritition by host, sort by url hash
     if (LOG.isInfoEnabled()) {
@@ -373,9 +446,43 @@
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(CrawlDatum.class);
     job.setOutputKeyComparatorClass(HashComparator.class);
-    JobClient.runJob(job);
-
-    new JobClient(getConf()).getFs().delete(tempDir);
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      fs.delete(tempDir);
+      throw e;
+    }
+    if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {
+      // update the db from tempDir
+      Path tempDir2 =
+        new Path(getConf().get("mapred.temp.dir", ".") +
+                 "/generate-temp-"+ System.currentTimeMillis());
+  
+      job = new NutchJob(getConf());
+      job.setJobName("generate: updatedb " + dbDir);
+      job.addInputPath(tempDir);
+      job.addInputPath(new Path(dbDir, CrawlDb.CURRENT_NAME));
+      job.setInputFormat(SequenceFileInputFormat.class);
+      job.setMapperClass(CrawlDbUpdater.class);
+      job.setReducerClass(CrawlDbUpdater.class);
+      job.setOutputFormat(MapFileOutputFormat.class);
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(CrawlDatum.class);
+      job.setOutputPath(tempDir2);
+      try {
+        JobClient.runJob(job);
+        CrawlDb.install(job, dbDir);
+      } catch (IOException e) {
+        LockUtil.removeLockFile(fs, lock);
+        fs.delete(tempDir);
+        fs.delete(tempDir2);
+        throw e;
+      }
+      fs.delete(tempDir2);
+    }
+    LockUtil.removeLockFile(fs, lock);
+    fs.delete(tempDir);
 
     if (LOG.isInfoEnabled()) { LOG.info("Generator: done."); }
 
@@ -402,7 +509,7 @@
   
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.out.println("Usage: Generator <crawldb> <segments_dir> [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
+      System.out.println("Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter]");
       return -1;
     }
 
@@ -412,6 +519,7 @@
     long topN = Long.MAX_VALUE;
     int numFetchers = -1;
     boolean filter = true;
+    boolean force = false;
 
     for (int i = 2; i < args.length; i++) {
       if ("-topN".equals(args[i])) {
@@ -425,13 +533,16 @@
         curTime += numDays * 1000L * 60 * 60 * 24;
       } else if ("-noFilter".equals(args[i])) {
         filter = false;
+      } else if ("-force".equals(args[i])) {
+        force = true;
       }
       
     }
 
     try {
-      generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter);
-      return 0;
+      Path seg = generate(dbDir, segmentsDir, numFetchers, topN, curTime, filter, force);
+      if (seg == null) return -2;
+      else return 0;
     } catch (Exception e) {
       LOG.fatal("Generator: " + StringUtils.stringifyException(e));
       return -1;

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java Wed Dec 27 16:03:04 2006
@@ -78,7 +78,7 @@
       }
       if (url != null) {                          // if it passes
         value.set(url);                           // collect it
-        CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, interval);
+        CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_INJECTED, interval);
         datum.setScore(scoreInjected);
         try {
           scfilters.injectedScore(value, datum);
@@ -102,7 +102,22 @@
     public void reduce(WritableComparable key, Iterator values,
                        OutputCollector output, Reporter reporter)
       throws IOException {
-      output.collect(key, (Writable)values.next()); // just collect first value
+      CrawlDatum old = null;
+      CrawlDatum injected = null;
+      while (values.hasNext()) {
+        CrawlDatum val = (CrawlDatum)values.next();
+        if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
+          injected = val;
+          injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
+        } else {
+          old = val;
+        }
+      }
+      CrawlDatum res = null;
+      if (old != null) res = old; // don't overwrite existing value
+      else res = injected;
+
+      output.collect(key, res);
     }
   }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/LinkDb.java Wed Dec 27 16:03:04 2006
@@ -36,6 +36,7 @@
 import org.apache.nutch.net.URLFilters;
 import org.apache.nutch.net.URLNormalizers;
 import org.apache.nutch.parse.*;
+import org.apache.nutch.util.LockUtil;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.NutchJob;
 
@@ -44,7 +45,8 @@
 
   public static final Log LOG = LogFactory.getLog(LinkDb.class);
 
-  public static String CURRENT_NAME = "current";
+  public static final String CURRENT_NAME = "current";
+  public static final String LOCK_NAME = ".locked";
 
   private int maxAnchorLength;
   private int maxInlinks;
@@ -178,20 +180,11 @@
                      OutputCollector output, Reporter reporter)
     throws IOException {
 
-    Inlinks result = null;
+    Inlinks result = new Inlinks();
 
     while (values.hasNext()) {
       Inlinks inlinks = (Inlinks)values.next();
 
-      if (result == null) {                       // optimize a common case
-        if (inlinks.size() < maxInlinks) {
-          result = inlinks;
-          continue;
-        } else {
-          result = new Inlinks();
-        }
-      }
-
       int end = Math.min(maxInlinks - result.size(), inlinks.size());
       Iterator it = inlinks.iterator();
       int i = 0;
@@ -199,10 +192,11 @@
         result.add((Inlink)it.next());
       }
     }
+    if (result.size() == 0) return;
     output.collect(key, result);
   }
 
-  public void invert(Path linkDb, final Path segmentsDir, boolean normalize, boolean filter) throws IOException {
+  public void invert(Path linkDb, final Path segmentsDir, boolean normalize, boolean filter, boolean force) throws IOException {
     final FileSystem fs = FileSystem.get(getConf());
     Path[] files = fs.listPaths(segmentsDir, new PathFilter() {
       public boolean accept(Path f) {
@@ -212,11 +206,14 @@
         return false;
       }
     });
-    invert(linkDb, files, normalize, filter);
+    invert(linkDb, files, normalize, filter, force);
   }
 
-  public void invert(Path linkDb, Path[] segments, boolean normalize, boolean filter) throws IOException {
+  public void invert(Path linkDb, Path[] segments, boolean normalize, boolean filter, boolean force) throws IOException {
 
+    Path lock = new Path(linkDb, LOCK_NAME);
+    FileSystem fs = FileSystem.get(getConf());
+    LockUtil.createLockFile(fs, lock, force);
     if (LOG.isInfoEnabled()) {
       LOG.info("LinkDb: starting");
       LOG.info("LinkDb: linkdb: " + linkDb);
@@ -230,8 +227,12 @@
       }
       job.addInputPath(new Path(segments[i], ParseData.DIR_NAME));
     }
-    JobClient.runJob(job);
-    FileSystem fs = FileSystem.get(getConf());
+    try {
+      JobClient.runJob(job);
+    } catch (IOException e) {
+      LockUtil.removeLockFile(fs, lock);
+      throw e;
+    }
     if (fs.exists(linkDb)) {
       if (LOG.isInfoEnabled()) {
         LOG.info("LinkDb: merging with existing linkdb: " + linkDb);
@@ -241,7 +242,13 @@
       job = LinkDb.createMergeJob(getConf(), linkDb, normalize, filter);
       job.addInputPath(new Path(linkDb, CURRENT_NAME));
       job.addInputPath(newLinkDb);
-      JobClient.runJob(job);
+      try {
+        JobClient.runJob(job);
+      } catch (IOException e) {
+        LockUtil.removeLockFile(fs, lock);
+        fs.delete(newLinkDb);
+        throw e;
+      }
       fs.delete(newLinkDb);
     }
     LinkDb.install(job, linkDb);
@@ -257,8 +264,6 @@
     job.setJobName("linkdb " + linkDb);
 
     job.setInputFormat(SequenceFileInputFormat.class);
-    job.setInputKeyClass(Text.class);
-    job.setInputValueClass(ParseData.class);
 
     job.setMapperClass(LinkDb.class);
     // if we don't run the mergeJob, perform normalization/filtering now
@@ -293,8 +298,6 @@
     job.setJobName("linkdb merge " + linkDb);
 
     job.setInputFormat(SequenceFileInputFormat.class);
-    job.setInputKeyClass(Text.class);
-    job.setInputValueClass(Inlinks.class);
 
     job.setMapperClass(LinkDbFilter.class);
     job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
@@ -322,6 +325,7 @@
     fs.mkdirs(linkDb);
     fs.rename(newLinkDb, current);
     if (fs.exists(old)) fs.delete(old);
+    LockUtil.removeLockFile(fs, new Path(linkDb, LOCK_NAME));
   }
 
   public static void main(String[] args) throws Exception {
@@ -331,10 +335,11 @@
   
   public int run(String[] args) throws Exception {
     if (args.length < 2) {
-      System.err.println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> ...) [-noNormalize] [-noFilter]");
+      System.err.println("Usage: LinkDb <linkdb> (-dir <segmentsDir> | <seg1> <seg2> ...) [-force] [-noNormalize] [-noFilter]");
       System.err.println("\tlinkdb\toutput LinkDb to create or update");
       System.err.println("\t-dir segmentsDir\tparent directory of several segments, OR");
       System.err.println("\tseg1 seg2 ...\t list of segment directories");
+      System.err.println("\t-force\tforce update even if LinkDb appears to be locked (CAUTION advised)");
       System.err.println("\t-noNormalize\tdon't normalize link URLs");
       System.err.println("\t-noFilter\tdon't apply URLFilters to link URLs");
       return -1;
@@ -345,6 +350,7 @@
     ArrayList segs = new ArrayList();
     boolean filter = true;
     boolean normalize = true;
+    boolean force = false;
     for (int i = 1; i < args.length; i++) {
       if (args[i].equals("-dir")) {
         segDir = new Path(args[++i]);
@@ -362,10 +368,12 @@
         normalize = false;
       } else if (args[i].equalsIgnoreCase("-noFilter")) {
         filter = false;
+      } else if (args[i].equalsIgnoreCase("-force")) {
+        force = true;
       } else segs.add(new Path(args[i]));
     }
     try {
-      invert(db, (Path[])segs.toArray(new Path[segs.size()]), normalize, filter);
+      invert(db, (Path[])segs.toArray(new Path[segs.size()]), normalize, filter, force);
       return 0;
     } catch (Exception e) {
       LOG.fatal("LinkDb: " + StringUtils.stringifyException(e));

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/crawl/MapWritable.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/crawl/MapWritable.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/crawl/MapWritable.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/crawl/MapWritable.java Wed Dec 27 16:03:04 2006
@@ -43,6 +43,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.protocol.ProtocolStatus;
 
 /**
  * A writable map, with a similar behavior as <code>java.util.HashMap</code>.
@@ -94,6 +95,7 @@
     addToMap(FloatWritable.class, new Byte((byte) -117));
     addToMap(IntWritable.class, new Byte((byte) -116));
     addToMap(ObjectWritable.class, new Byte((byte) -115));
+    addToMap(ProtocolStatus.class, new Byte((byte) -114));
 
   }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/fetcher/Fetcher.java Wed Dec 27 16:03:04 2006
@@ -149,7 +149,7 @@
               switch(status.getCode()) {
 
               case ProtocolStatus.SUCCESS:        // got a page
-                pstatus = output(url, datum, content, CrawlDatum.STATUS_FETCH_SUCCESS);
+                pstatus = output(url, datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
                 updateStatus(content.getContent().length);
                 if (pstatus != null && pstatus.isSuccess() &&
                         pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
@@ -158,10 +158,17 @@
                   newUrl = this.urlFilters.filter(newUrl);
                   if (newUrl != null && !newUrl.equals(url.toString())) {
                     url = new Text(newUrl);
-                    redirecting = true;
-                    redirectCount++;
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug(" - content redirect to " + url);
+                    if (maxRedirect > 0) {
+                      redirecting = true;
+                      redirectCount++;
+                      if (LOG.isDebugEnabled()) {
+                        LOG.debug(" - content redirect to " + url + " (fetching now)");
+                      }
+                    } else {
+                      output(url, new CrawlDatum(), null, null, CrawlDatum.STATUS_FETCH_REDIR_TEMP);
+                      if (LOG.isDebugEnabled()) {
+                        LOG.debug(" - content redirect to " + url + " (fetching later)");
+                      }
                     }
                   } else if (LOG.isDebugEnabled()) {
                     LOG.debug(" - content redirect skipped: " +
@@ -172,15 +179,29 @@
 
               case ProtocolStatus.MOVED:         // redirect
               case ProtocolStatus.TEMP_MOVED:
+                int code;
+                if (status.getCode() == ProtocolStatus.MOVED) {
+                  code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+                } else {
+                  code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+                }
+                output(url, datum, content, status, code);
                 String newUrl = status.getMessage();
                 newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
                 newUrl = this.urlFilters.filter(newUrl);
                 if (newUrl != null && !newUrl.equals(url.toString())) {
                   url = new Text(newUrl);
-                  redirecting = true;
-                  redirectCount++;
-                  if (LOG.isDebugEnabled()) {
-                    LOG.debug(" - protocol redirect to " + url);
+                  if (maxRedirect > 0) {
+                    redirecting = true;
+                    redirectCount++;
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug(" - protocol redirect to " + url + " (fetching now)");
+                    }
+                  } else {
+                    output(url, new CrawlDatum(), null, null, code);
+                    if (LOG.isDebugEnabled()) {
+                      LOG.debug(" - protocol redirect to " + url + " (fetching later)");
+                    }
                   }
                 } else if (LOG.isDebugEnabled()) {
                   LOG.debug(" - protocol redirect skipped: " +
@@ -198,7 +219,7 @@
               // intermittent blocking - retry without increasing the counter
               case ProtocolStatus.WOULDBLOCK:
               case ProtocolStatus.BLOCKED:
-                output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY);
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
                 break;
                 
               // permanent failures
@@ -207,21 +228,21 @@
               case ProtocolStatus.ACCESS_DENIED:
               case ProtocolStatus.ROBOTS_DENIED:
               case ProtocolStatus.NOTMODIFIED:
-                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
                 break;
 
               default:
                 if (LOG.isWarnEnabled()) {
                   LOG.warn("Unknown ProtocolStatus: " + status.getCode());
                 }
-                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
               }
 
               if (redirecting && redirectCount >= maxRedirect) {
                 if (LOG.isInfoEnabled()) {
                   LOG.info(" - redirect count exceeded " + url);
                 }
-                output(url, datum, null, CrawlDatum.STATUS_FETCH_GONE);
+                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_GONE);
               }
 
             } while (redirecting && (redirectCount < maxRedirect));
@@ -229,7 +250,7 @@
             
           } catch (Throwable t) {                 // unexpected exception
             logError(url, t.toString());
-            output(url, datum, null, CrawlDatum.STATUS_FETCH_RETRY);
+            output(url, datum, null, null, CrawlDatum.STATUS_FETCH_RETRY);
             
           }
         }
@@ -254,10 +275,11 @@
     }
 
     private ParseStatus output(Text key, CrawlDatum datum,
-                        Content content, int status) {
+                        Content content, ProtocolStatus pstatus, int status) {
 
       datum.setStatus(status);
       datum.setFetchTime(System.currentTimeMillis());
+      if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
 
       if (content == null) {
         String url = key.toString();

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/indexer/Indexer.java Wed Dec 27 16:03:04 2006
@@ -41,6 +41,7 @@
 import org.apache.nutch.util.NutchJob;
 
 import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.crawl.LinkDb;
 
@@ -286,7 +287,7 @@
       job.addInputPath(new Path(segments[i], ParseText.DIR_NAME));
     }
 
-    job.addInputPath(new Path(crawlDb, CrawlDatum.DB_DIR_NAME));
+    job.addInputPath(new Path(crawlDb, CrawlDb.CURRENT_NAME));
     job.addInputPath(new Path(linkDb, LinkDb.CURRENT_NAME));
 
     job.setInputFormat(InputFormat.class);

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/metadata/Nutch.java Wed Dec 27 16:03:04 2006
@@ -16,6 +16,8 @@
  */
 package org.apache.nutch.metadata;
 
+import org.apache.hadoop.io.Text;
+
 
 /**
  * A collection of Nutch internal metadata constants.
@@ -36,5 +38,13 @@
   public static final String SEGMENT_NAME_KEY = "nutch.segment.name";
 
   public static final String SCORE_KEY = "nutch.crawl.score";
+
+  public static final String GENERATE_TIME_KEY = "_ngt_";
+
+  public static final Text WRITABLE_GENERATE_TIME_KEY = new Text(GENERATE_TIME_KEY);
+
+  public static final String PROTO_STATUS_KEY = "_pst_";
+
+  public static final Text WRITABLE_PROTO_STATUS_KEY = new Text(PROTO_STATUS_KEY);
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Wed Dec 27 16:03:04 2006
@@ -95,7 +95,7 @@
       final Path segmentDumpFile = new Path(job.getOutputPath(), name);
 
       // Get the old copy out of the way
-      fs.delete(segmentDumpFile);
+      if (fs.exists(segmentDumpFile)) fs.delete(segmentDumpFile);
 
       final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
       return new RecordWriter() {

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/tools/compat/CrawlDbConverter.java Wed Dec 27 16:03:04 2006
@@ -104,13 +104,13 @@
     if (args.length == 0) {
       System.err.println("Usage: CrawlDbConverter <oldDb> <newDb> [-withMetadata]");
       System.err.println("\toldDb\tname of the crawldb that uses UTF8 class.");
-      System.err.println("\tnewDb\tname of the crawldb that will use Text class.");
-      System.err.println("\twithMetadata\tconvert also all metadata keys using UTF8 to Text.");
+      System.err.println("\tnewDb\tname of the output crawldb that will use Text class.");
+      System.err.println("\twithMetadata\tconvert also all metadata keys that use UTF8 to Text.");
       return -1;
     }
     JobConf job = new NutchJob(getConf());
     FileSystem fs = FileSystem.get(getConf());
-    Path oldDb = new Path(args[0], CrawlDatum.DB_DIR_NAME);
+    Path oldDb = new Path(args[0], CrawlDb.CURRENT_NAME);
     Path newDb =
       new Path(oldDb,
                Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

Added: lucene/nutch/trunk/src/java/org/apache/nutch/util/LockUtil.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/util/LockUtil.java?view=auto&rev=490607
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/util/LockUtil.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/util/LockUtil.java Wed Dec 27 16:03:04 2006
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Utility methods for handling application-level locking.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class LockUtil {
+  
+  /**
+   * Create a lock file.
+   * @param fs filesystem
+   * @param lockFile name of the lock file
+   * @param accept if true, and the target file exists, consider it valid. If false
+   * and the target file exists, throw an IOException.
+   * @throws IOException if accept is false, and the target file already exists,
+   * or if it's a directory.
+   */
+  public static void createLockFile(FileSystem fs, Path lockFile, boolean accept) throws IOException {
+    if (fs.exists(lockFile)) {
+      if(!accept)
+        throw new IOException("lock file " + lockFile + " already exists.");
+      if (fs.isDirectory(lockFile))
+        throw new IOException("lock file " + lockFile + " already exists and is a directory.");
+      // do nothing - the file already exists.
+    } else {
+      // make sure parents exist
+      fs.mkdirs(lockFile.getParent());
+      fs.createNewFile(lockFile);
+    }
+  }
+
+  /**
+   * Remove lock file. NOTE: applications enforce the semantics of this file -
+   * this method simply removes any file with a given name.
+   * @param fs filesystem
+   * @param lockFile lock file name
+   * @return false, if the lock file doesn't exist. True, if it existed and was
+   * successfully removed.
+   * @throws IOException if lock file exists but it is a directory.
+   */
+  public static boolean removeLockFile(FileSystem fs, Path lockFile) throws IOException {
+    if (!fs.exists(lockFile)) return false;
+    if (fs.isDirectory(lockFile))
+      throw new IOException("lock file " + lockFile + " exists but is a directory!");
+    return fs.delete(lockFile);
+  }
+}

Propchange: lucene/nutch/trunk/src/java/org/apache/nutch/util/LockUtil.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java Wed Dec 27 16:03:04 2006
@@ -53,7 +53,7 @@
   public static void createCrawlDb(FileSystem fs, Path crawldb, List<URLCrawlDatum> init)
       throws Exception {
     LOG.trace("* creating crawldb: " + crawldb);
-    Path dir = new Path(crawldb, CrawlDatum.DB_DIR_NAME);
+    Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
     MapFile.Writer writer = new MapFile.Writer(fs, new Path(dir, "part-00000")
         .toString(), Text.class, CrawlDatum.class);
     Iterator<URLCrawlDatum> it = init.iterator();

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestCrawlDbMerger.java Wed Dec 27 16:03:04 2006
@@ -125,7 +125,7 @@
   
   private void createCrawlDb(FileSystem fs, Path crawldb, TreeSet init, CrawlDatum cd) throws Exception {
     LOG.fine("* creating crawldb: " + crawldb);
-    Path dir = new Path(crawldb, CrawlDatum.DB_DIR_NAME);
+    Path dir = new Path(crawldb, CrawlDb.CURRENT_NAME);
     MapFile.Writer writer = new MapFile.Writer(fs, new Path(dir, "part-00000").toString(), Text.class, CrawlDatum.class);
     Iterator it = init.iterator();
     while (it.hasNext()) {

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestGenerator.java Wed Dec 27 16:03:04 2006
@@ -259,20 +259,14 @@
     Path generatedSegment = generateFetchlist(Integer.MAX_VALUE,
         myConfiguration, true);
 
-    Path fetchlistPath = new Path(new Path(generatedSegment,
-        CrawlDatum.GENERATE_DIR_NAME), "part-00000");
-
-    ArrayList<URLCrawlDatum> fetchList = readContents(fetchlistPath);
-
-    // verify all got filtered out
-    assertEquals(0, fetchList.size());
+    assertNull("should be null (0 entries)", generatedSegment);
 
     generatedSegment = generateFetchlist(Integer.MAX_VALUE, myConfiguration, false);
 
-    fetchlistPath = new Path(new Path(generatedSegment,
+    Path fetchlistPath = new Path(new Path(generatedSegment,
         CrawlDatum.GENERATE_DIR_NAME), "part-00000");
 
-    fetchList = readContents(fetchlistPath);
+    ArrayList<URLCrawlDatum> fetchList = readContents(fetchlistPath);
 
     // verify nothing got filtered
     assertEquals(list.size(), fetchList.size());
@@ -317,7 +311,7 @@
     // generate segment
     Generator g = new Generator(config);
     Path generatedSegment = g.generate(dbDir, segmentsDir, -1, numResults,
-        Long.MAX_VALUE, filter);
+        Long.MAX_VALUE, filter, false);
     return generatedSegment;
   }
 

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/crawl/TestInjector.java Wed Dec 27 16:03:04 2006
@@ -105,7 +105,7 @@
   }
   
   private List<String> readCrawldb() throws IOException{
-    Path dbfile=new Path(crawldbPath,CrawlDatum.DB_DIR_NAME + "/part-00000/data");
+    Path dbfile=new Path(crawldbPath,CrawlDb.CURRENT_NAME + "/part-00000/data");
     System.out.println("reading:" + dbfile);
     SequenceFile.Reader reader=new SequenceFile.Reader(fs, dbfile, conf);
     ArrayList<String> read=new ArrayList<String>();

Modified: lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java?view=diff&rev=490607&r1=490606&r2=490607
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/fetcher/TestFetcher.java Wed Dec 27 16:03:04 2006
@@ -88,7 +88,7 @@
     //generate
     Generator g=new Generator(conf);
     Path generatedSegment = g.generate(crawldbPath, segmentsPath, 1,
-        Long.MAX_VALUE, Long.MAX_VALUE, false);
+        Long.MAX_VALUE, Long.MAX_VALUE, false, false);
 
     long time=System.currentTimeMillis();
     //fetch