You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2014/02/28 16:44:22 UTC

svn commit: r1572975 - in /nutch/trunk: CHANGES.txt src/java/org/apache/nutch/segment/SegmentMerger.java src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java

Author: markus
Date: Fri Feb 28 15:44:21 2014
New Revision: 1572975

URL: http://svn.apache.org/r1572975
Log:
NUTCH-1113 SegmentMerger can now be safely used to merge segments. If this damn thing breaks again....

Added:
    nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1572975&r1=1572974&r2=1572975&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Feb 28 15:44:21 2014
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Nutch Development Trunk
 
+* NUTCH-1113 SegmentMerger can now be safely used to merge segments (Edward Drapkin, markus, snagel)
+
 * NUTCH-1729 Upgrade to Tika 1.5 (jnioche)
 
 * NUTCH-1707 DummyIndexingWriter (markus)

Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=1572975&r1=1572974&r2=1572975&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Fri Feb 28 15:44:21 2014
@@ -408,14 +408,16 @@ public class SegmentMerger extends Confi
             }
           }
         } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
-          if (lastF == null) {
-            lastF = val;
-            lastFname = sp.segmentName;
-          } else {
-            // only consider fetch status
-            // https://issues.apache.org/jira/browse/NUTCH-1520
-            if (CrawlDatum.hasFetchStatus(val)) {
-              // take newer
+          // only consider fetch status and ignore fetch retry status
+          // https://issues.apache.org/jira/browse/NUTCH-1520
+          // https://issues.apache.org/jira/browse/NUTCH-1113
+          if (CrawlDatum.hasFetchStatus(val) &&
+            val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY &&
+            val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+            if (lastF == null) {
+              lastF = val;
+              lastFname = sp.segmentName;
+            } else {
               if (lastFname.compareTo(sp.segmentName) < 0) {
                 lastF = val;
                 lastFname = sp.segmentName;

Added: nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java?rev=1572975&view=auto
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java (added)
+++ nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java Fri Feb 28 15:44:21 2014
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.text.DecimalFormat;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.NutchConfiguration;
+
+import junit.framework.TestCase;
+
+/**
+ * New SegmentMerger unit test focussing on several crappy issues with the segment
+ * merger. The general problem is disappearing records and incorrect CrawlDatum
+ * status values. This unit test performs random sequences of segment merging where
+ * we're looking for an expected status.
+ * A second test is able to randomly inject redirects in segment, likely causing
+ * the segment merger to fail resulting in a bad merged segment.
+ *
+ * See also:
+ *
+ * https://issues.apache.org/jira/browse/NUTCH-1113
+ * https://issues.apache.org/jira/browse/NUTCH-1616
+ * https://issues.apache.org/jira/browse/NUTCH-1520
+ *
+ * Cheers!
+ */
+public class TestSegmentMergerCrawlDatums extends TestCase {
+  Configuration conf;
+  FileSystem fs;
+  Random rnd;
+  
+  public void setUp() throws Exception {
+    conf = NutchConfiguration.create();
+    fs = FileSystem.get(conf);
+    rnd = new Random();
+  }
+  
+  /**
+   *
+   */
+  public void testSingleRandomSequence() throws Exception {
+    assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), new Byte(executeSequence(CrawlDatum.STATUS_FETCH_GONE, CrawlDatum.STATUS_FETCH_SUCCESS, 256, false)));
+  }
+  
+  /**
+   *
+   */
+  public void testMostlyRedirects() throws Exception {
+    // Our test directory
+    Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+    
+    Path segment1 = new Path(testDir, "20140110114943");
+    Path segment2 = new Path(testDir, "20140110114832");
+    Path segment3 = new Path(testDir, "20140110114558");
+    Path segment4 = new Path(testDir, "20140110114930");
+    Path segment5 = new Path(testDir, "20140110114545");
+    Path segment6 = new Path(testDir, "20140110114507");
+    Path segment7 = new Path(testDir, "20140110114903");
+    Path segment8 = new Path(testDir, "20140110114724");
+    
+    createSegment(segment1, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    createSegment(segment2, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    createSegment(segment4, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    createSegment(segment5, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    createSegment(segment6, CrawlDatum.STATUS_FETCH_SUCCESS, false);
+    createSegment(segment7, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    createSegment(segment8, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    
+    // Merge the segments and get status
+    Path mergedSegment = merge(testDir, new Path[]{segment1, segment2, segment3, segment4, segment5, segment6, segment7, segment8});
+    Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+    
+    assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+  }
+  
+  /**
+   *
+   */
+  public void testRandomizedSequences() throws Exception {
+    for (int i = 0; i < rnd.nextInt(16) + 16; i++) {
+      byte expectedStatus = (byte)(rnd.nextInt(6) + 0x21);
+      byte randomStatus = (byte)(rnd.nextInt(6) + 0x21);
+      
+      assertEquals(new Byte(expectedStatus), new Byte(executeSequence(randomStatus, expectedStatus, rnd.nextInt(16) + 32, rnd.nextBoolean())));
+    }
+  }
+  
+  /**
+   *
+   */
+  public void testRandomTestSequenceWithRedirects() throws Exception {
+    assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), new Byte(executeSequence(CrawlDatum.STATUS_FETCH_GONE, CrawlDatum.STATUS_FETCH_SUCCESS, 128, true)));
+  }
+  
+  /**
+   * Check a fixed sequence!
+   */
+  public void testFixedSequence() throws Exception {
+    // Our test directory
+    Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+    
+    Path segment1 = new Path(testDir, "00001");
+    Path segment2 = new Path(testDir, "00002");
+    Path segment3 = new Path(testDir, "00003");
+    
+    createSegment(segment1, CrawlDatum.STATUS_FETCH_GONE, false);
+    createSegment(segment2, CrawlDatum.STATUS_FETCH_GONE, true);
+    createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, false);
+    
+    // Merge the segments and get status
+    Path mergedSegment = merge(testDir, new Path[]{segment1, segment2, segment3});
+    Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+    
+    assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+  }
+  
+  /**
+   * Check a fixed sequence!
+   */
+  public void testRedirFetchInOneSegment() throws Exception {
+    // Our test directory
+    Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+    
+    Path segment1 = new Path(testDir, "00001");
+    Path segment2 = new Path(testDir, "00002");
+    Path segment3 = new Path(testDir, "00003");
+    
+    createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, true, true);
+    
+    // Merge the segments and get status
+    Path mergedSegment = merge(testDir, new Path[]{segment3});
+    Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+    
+    assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+  }
+  
+  /**
+   * Check a fixed sequence!
+   */
+  public void testEndsWithRedirect() throws Exception {
+    // Our test directory
+    Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+    
+    Path segment1 = new Path(testDir, "00001");
+    Path segment2 = new Path(testDir, "00002");
+    
+    createSegment(segment1, CrawlDatum.STATUS_FETCH_SUCCESS, false);
+    createSegment(segment2, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+    
+    // Merge the segments and get status
+    Path mergedSegment = merge(testDir, new Path[]{segment1, segment2});
+    Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+    
+    assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+  }
+  
+  /**
+   * Execute a sequence of creating segments, merging them and checking the final output
+   *
+   * @param status to start with
+   * @param status to end with
+   * @param number of rounds
+   * @param whether redirects are injected randomly
+   * @return the CrawlDatum status
+   */
+  protected byte executeSequence(byte firstSatus, byte lastStatus, int rounds, boolean redirect) throws Exception {
+    // Our test directory
+    Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+    
+    // Format for the segments
+    DecimalFormat df = new DecimalFormat("0000000");
+    
+    // Create our segment paths
+    Path[] segmentPaths = new Path[rounds];
+    for (int i = 0; i < rounds; i++) {
+      String segmentName = df.format(i);
+      segmentPaths[i] = new Path(testDir, segmentName);
+    }
+       
+    // Create the first segment according to the specified status
+    createSegment(segmentPaths[0], firstSatus, false);
+    
+    // Create N segments with random status and optionally with randomized redirect injection
+    for (int i = 1; i < rounds - 1; i++) {
+      // Status, 6 possibilities incremented with 33 hex
+      byte status = (byte)(rnd.nextInt(6) + 0x21);
+      
+      // Whether this is going to be a redirect
+      boolean needsToRedirect = redirect ? rnd.nextBoolean() : false;
+      boolean redirectAndFetch = redirect ? rnd.nextBoolean() : false;
+      
+      createSegment(segmentPaths[i], status, needsToRedirect, redirectAndFetch);
+    }
+
+    // Create the last segment according to the specified status
+    createSegment(segmentPaths[rounds - 1], lastStatus, redirect ? rnd.nextBoolean() : false, redirect ? rnd.nextBoolean() : false);    
+    
+    // Merge the segments!
+    Path mergedSegment = merge(testDir, segmentPaths);
+    
+    // Check the status of the final record and return it
+    return checkMergedSegment(testDir, mergedSegment);
+  }
+  
+  /**
+   * Checks the merged segment and removes the stuff again.
+   *
+   * @param the test directory
+   * @param the merged segment
+   * @return the final status
+   */
+  protected byte checkMergedSegment(Path testDir, Path mergedSegment) throws Exception  {
+    // Get a MapFile reader for the <Text,CrawlDatum> pairs
+    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(mergedSegment, CrawlDatum.FETCH_DIR_NAME), conf);
+    
+    Text key = new Text();
+    CrawlDatum value = new CrawlDatum();
+    byte finalStatus = 0x0;
+    
+    for (MapFile.Reader reader : readers) {
+      while (reader.next(key, value)) {
+        System.out.println("Reading status for: " + key.toString() + " > " + CrawlDatum.getStatusName(value.getStatus()));
+        
+        // Only consider fetch status
+        if (CrawlDatum.hasFetchStatus(value) && key.toString().equals("http://nutch.apache.org/")) {
+          finalStatus = value.getStatus();
+        }
+      }
+      
+      // Close the reader again
+      reader.close();
+    }
+
+    // Remove the test directory again
+    fs.delete(testDir, true);
+    
+    System.out.println("Final fetch status for: http://nutch.apache.org/ > " + CrawlDatum.getStatusName(finalStatus));
+
+    // Return the final status
+    return finalStatus;
+  }
+  
+  /**
+   * Merge some segments!
+   *
+   * @param the test directory
+   * @param the segments to merge
+   * @return Path to the merged segment
+   */
+  protected Path merge(Path testDir, Path[] segments) throws Exception {
+    // Our merged output directory
+    Path out = new Path(testDir, "out");
+    
+    // Merge
+    SegmentMerger merger = new SegmentMerger(conf);
+    merger.merge(out, segments, false, false, -1);
+
+    FileStatus[] stats = fs.listStatus(out);
+    assertEquals(1, stats.length);
+    
+    return stats[0].getPath();
+  }
+  
+  /**
+   * Create a segment with the specified status.
+   *
+   * @param the segment's paths
+   * @param the status of the record, ignored if redirect is true
+   * @param whether we're doing a redirect as well
+   */
+  protected void createSegment(Path segment, byte status, boolean redirect) throws Exception {
+    if (redirect) {
+      createSegment(segment, status, false, true);
+    } else {
+      createSegment(segment, status, true, false);
+    }
+  }
+
+  protected void createSegment(Path segment, byte status, boolean fetch, boolean redirect) throws Exception {
+    System.out.println("\nSegment: " + segment.toString());
+    
+    // The URL of our main record
+    String url = "http://nutch.apache.org/";
+    
+    // The URL of our redirecting URL
+    String redirectUrl = "http://nutch.apache.org/i_redirect_to_the_root/";
+    
+    // Our value
+    CrawlDatum value = new CrawlDatum();
+    
+    // Path of the segment's crawl_fetch directory
+    Path crawlFetchPath = new Path(new Path(segment, CrawlDatum.FETCH_DIR_NAME), "part-00000");
+
+    // Get a writer for map files containing <Text,CrawlDatum> pairs
+    MapFile.Writer writer = new MapFile.Writer(conf, fs, crawlFetchPath.toString(), Text.class, CrawlDatum.class);
+
+    // Whether we're handling a redirect now
+    // first add the linked datum
+    // - before redirect status because url sorts before redirectUrl
+    // - before fetch status to check whether fetch datum is preferred over linked datum when merging
+    if (redirect) {
+      // We're writing our our main record URL with status linked
+      System.out.println(url + " > " + CrawlDatum.getStatusName(CrawlDatum.STATUS_LINKED));
+      value = new CrawlDatum();
+      value.setStatus(CrawlDatum.STATUS_LINKED);
+      writer.append(new Text(url), value);
+    }
+
+    // Whether we're fetching now
+    if (fetch) {
+      System.out.println(url + " > " + CrawlDatum.getStatusName(status));
+      
+      // Set the status
+      value.setStatus(status);
+
+      // Write the pair and ok
+      writer.append(new Text(url), value);
+    }
+
+    // Whether we're handing a redirect now
+    if (redirect) {
+      // And the redirect URL with redirect status, pointing to our main URL
+      System.out.println(redirectUrl + " > " + CrawlDatum.getStatusName(CrawlDatum.STATUS_FETCH_REDIR_TEMP));
+      value.setStatus(CrawlDatum.STATUS_FETCH_REDIR_TEMP);
+      writer.append(new Text(redirectUrl), value);
+    }
+    
+    // Close the stuff
+    writer.close();
+  }
+
+}