You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2014/02/28 16:44:22 UTC
svn commit: r1572975 - in /nutch/trunk: CHANGES.txt
src/java/org/apache/nutch/segment/SegmentMerger.java
src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java
Author: markus
Date: Fri Feb 28 15:44:21 2014
New Revision: 1572975
URL: http://svn.apache.org/r1572975
Log:
NUTCH-1113 SegmentMerger can now be safely used to merge segments. If this damn thing breaks again....
Added:
nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1572975&r1=1572974&r2=1572975&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Feb 28 15:44:21 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Development Trunk
+* NUTCH-1113 SegmentMerger can now be safely used to merge segments (Edward Drapkin, markus, snagel)
+
* NUTCH-1729 Upgrade to Tika 1.5 (jnioche)
* NUTCH-1707 DummyIndexingWriter (markus)
Modified: nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java?rev=1572975&r1=1572974&r2=1572975&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/segment/SegmentMerger.java Fri Feb 28 15:44:21 2014
@@ -408,14 +408,16 @@ public class SegmentMerger extends Confi
}
}
} else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
- if (lastF == null) {
- lastF = val;
- lastFname = sp.segmentName;
- } else {
- // only consider fetch status
- // https://issues.apache.org/jira/browse/NUTCH-1520
- if (CrawlDatum.hasFetchStatus(val)) {
- // take newer
+ // only consider fetch status and ignore fetch retry status
+ // https://issues.apache.org/jira/browse/NUTCH-1520
+ // https://issues.apache.org/jira/browse/NUTCH-1113
+ if (CrawlDatum.hasFetchStatus(val) &&
+ val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY &&
+ val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+ if (lastF == null) {
+ lastF = val;
+ lastFname = sp.segmentName;
+ } else {
if (lastFname.compareTo(sp.segmentName) < 0) {
lastF = val;
lastFname = sp.segmentName;
Added: nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java?rev=1572975&view=auto
==============================================================================
--- nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java (added)
+++ nutch/trunk/src/test/org/apache/nutch/segment/TestSegmentMergerCrawlDatums.java Fri Feb 28 15:44:21 2014
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.text.DecimalFormat;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.NutchConfiguration;
+
+import junit.framework.TestCase;
+
+/**
+ * New SegmentMerger unit test focussing on several crappy issues with the segment
+ * merger. The general problem is disappearing records and incorrect CrawlDatum
+ * status values. This unit test performs random sequences of segment merging where
+ * we're looking for an expected status.
+ * A second test is able to randomly inject redirects in segment, likely causing
+ * the segment merger to fail resulting in a bad merged segment.
+ *
+ * See also:
+ *
+ * https://issues.apache.org/jira/browse/NUTCH-1113
+ * https://issues.apache.org/jira/browse/NUTCH-1616
+ * https://issues.apache.org/jira/browse/NUTCH-1520
+ *
+ * Cheers!
+ */
+public class TestSegmentMergerCrawlDatums extends TestCase {
+ Configuration conf;
+ FileSystem fs;
+ Random rnd;
+
+ public void setUp() throws Exception {
+ conf = NutchConfiguration.create();
+ fs = FileSystem.get(conf);
+ rnd = new Random();
+ }
+
+ /**
+ *
+ */
+ public void testSingleRandomSequence() throws Exception {
+ assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), new Byte(executeSequence(CrawlDatum.STATUS_FETCH_GONE, CrawlDatum.STATUS_FETCH_SUCCESS, 256, false)));
+ }
+
+ /**
+ *
+ */
+ public void testMostlyRedirects() throws Exception {
+ // Our test directory
+ Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+
+ Path segment1 = new Path(testDir, "20140110114943");
+ Path segment2 = new Path(testDir, "20140110114832");
+ Path segment3 = new Path(testDir, "20140110114558");
+ Path segment4 = new Path(testDir, "20140110114930");
+ Path segment5 = new Path(testDir, "20140110114545");
+ Path segment6 = new Path(testDir, "20140110114507");
+ Path segment7 = new Path(testDir, "20140110114903");
+ Path segment8 = new Path(testDir, "20140110114724");
+
+ createSegment(segment1, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+ createSegment(segment2, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+ createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+ createSegment(segment4, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+ createSegment(segment5, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+ createSegment(segment6, CrawlDatum.STATUS_FETCH_SUCCESS, false);
+ createSegment(segment7, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+ createSegment(segment8, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+
+ // Merge the segments and get status
+ Path mergedSegment = merge(testDir, new Path[]{segment1, segment2, segment3, segment4, segment5, segment6, segment7, segment8});
+ Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+
+ assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+ }
+
+ /**
+ *
+ */
+ public void testRandomizedSequences() throws Exception {
+ for (int i = 0; i < rnd.nextInt(16) + 16; i++) {
+ byte expectedStatus = (byte)(rnd.nextInt(6) + 0x21);
+ byte randomStatus = (byte)(rnd.nextInt(6) + 0x21);
+
+ assertEquals(new Byte(expectedStatus), new Byte(executeSequence(randomStatus, expectedStatus, rnd.nextInt(16) + 32, rnd.nextBoolean())));
+ }
+ }
+
+ /**
+ *
+ */
+ public void testRandomTestSequenceWithRedirects() throws Exception {
+ assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), new Byte(executeSequence(CrawlDatum.STATUS_FETCH_GONE, CrawlDatum.STATUS_FETCH_SUCCESS, 128, true)));
+ }
+
+ /**
+ * Check a fixed sequence!
+ */
+ public void testFixedSequence() throws Exception {
+ // Our test directory
+ Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+
+ Path segment1 = new Path(testDir, "00001");
+ Path segment2 = new Path(testDir, "00002");
+ Path segment3 = new Path(testDir, "00003");
+
+ createSegment(segment1, CrawlDatum.STATUS_FETCH_GONE, false);
+ createSegment(segment2, CrawlDatum.STATUS_FETCH_GONE, true);
+ createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, false);
+
+ // Merge the segments and get status
+ Path mergedSegment = merge(testDir, new Path[]{segment1, segment2, segment3});
+ Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+
+ assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+ }
+
+ /**
+ * Check a fixed sequence!
+ */
+ public void testRedirFetchInOneSegment() throws Exception {
+ // Our test directory
+ Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+
+ Path segment1 = new Path(testDir, "00001");
+ Path segment2 = new Path(testDir, "00002");
+ Path segment3 = new Path(testDir, "00003");
+
+ createSegment(segment3, CrawlDatum.STATUS_FETCH_SUCCESS, true, true);
+
+ // Merge the segments and get status
+ Path mergedSegment = merge(testDir, new Path[]{segment3});
+ Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+
+ assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+ }
+
+ /**
+ * Check a fixed sequence!
+ */
+ public void testEndsWithRedirect() throws Exception {
+ // Our test directory
+ Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+
+ Path segment1 = new Path(testDir, "00001");
+ Path segment2 = new Path(testDir, "00002");
+
+ createSegment(segment1, CrawlDatum.STATUS_FETCH_SUCCESS, false);
+ createSegment(segment2, CrawlDatum.STATUS_FETCH_SUCCESS, true);
+
+ // Merge the segments and get status
+ Path mergedSegment = merge(testDir, new Path[]{segment1, segment2});
+ Byte status = new Byte(status = checkMergedSegment(testDir, mergedSegment));
+
+ assertEquals(new Byte(CrawlDatum.STATUS_FETCH_SUCCESS), status);
+ }
+
+ /**
+ * Execute a sequence of creating segments, merging them and checking the final output
+ *
+ * @param status to start with
+ * @param status to end with
+ * @param number of rounds
+ * @param whether redirects are injected randomly
+ * @return the CrawlDatum status
+ */
+ protected byte executeSequence(byte firstSatus, byte lastStatus, int rounds, boolean redirect) throws Exception {
+ // Our test directory
+ Path testDir = new Path(conf.get("hadoop.tmp.dir"), "merge-" + System.currentTimeMillis());
+
+ // Format for the segments
+ DecimalFormat df = new DecimalFormat("0000000");
+
+ // Create our segment paths
+ Path[] segmentPaths = new Path[rounds];
+ for (int i = 0; i < rounds; i++) {
+ String segmentName = df.format(i);
+ segmentPaths[i] = new Path(testDir, segmentName);
+ }
+
+ // Create the first segment according to the specified status
+ createSegment(segmentPaths[0], firstSatus, false);
+
+ // Create N segments with random status and optionally with randomized redirect injection
+ for (int i = 1; i < rounds - 1; i++) {
+ // Status, 6 possibilities incremented with 33 hex
+ byte status = (byte)(rnd.nextInt(6) + 0x21);
+
+ // Whether this is going to be a redirect
+ boolean needsToRedirect = redirect ? rnd.nextBoolean() : false;
+ boolean redirectAndFetch = redirect ? rnd.nextBoolean() : false;
+
+ createSegment(segmentPaths[i], status, needsToRedirect, redirectAndFetch);
+ }
+
+ // Create the last segment according to the specified status
+ createSegment(segmentPaths[rounds - 1], lastStatus, redirect ? rnd.nextBoolean() : false, redirect ? rnd.nextBoolean() : false);
+
+ // Merge the segments!
+ Path mergedSegment = merge(testDir, segmentPaths);
+
+ // Check the status of the final record and return it
+ return checkMergedSegment(testDir, mergedSegment);
+ }
+
+ /**
+ * Checks the merged segment and removes the stuff again.
+ *
+ * @param the test directory
+ * @param the merged segment
+ * @return the final status
+ */
+ protected byte checkMergedSegment(Path testDir, Path mergedSegment) throws Exception {
+ // Get a MapFile reader for the <Text,CrawlDatum> pairs
+ MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(mergedSegment, CrawlDatum.FETCH_DIR_NAME), conf);
+
+ Text key = new Text();
+ CrawlDatum value = new CrawlDatum();
+ byte finalStatus = 0x0;
+
+ for (MapFile.Reader reader : readers) {
+ while (reader.next(key, value)) {
+ System.out.println("Reading status for: " + key.toString() + " > " + CrawlDatum.getStatusName(value.getStatus()));
+
+ // Only consider fetch status
+ if (CrawlDatum.hasFetchStatus(value) && key.toString().equals("http://nutch.apache.org/")) {
+ finalStatus = value.getStatus();
+ }
+ }
+
+ // Close the reader again
+ reader.close();
+ }
+
+ // Remove the test directory again
+ fs.delete(testDir, true);
+
+ System.out.println("Final fetch status for: http://nutch.apache.org/ > " + CrawlDatum.getStatusName(finalStatus));
+
+ // Return the final status
+ return finalStatus;
+ }
+
+ /**
+ * Merge some segments!
+ *
+ * @param the test directory
+ * @param the segments to merge
+ * @return Path to the merged segment
+ */
+ protected Path merge(Path testDir, Path[] segments) throws Exception {
+ // Our merged output directory
+ Path out = new Path(testDir, "out");
+
+ // Merge
+ SegmentMerger merger = new SegmentMerger(conf);
+ merger.merge(out, segments, false, false, -1);
+
+ FileStatus[] stats = fs.listStatus(out);
+ assertEquals(1, stats.length);
+
+ return stats[0].getPath();
+ }
+
+ /**
+ * Create a segment with the specified status.
+ *
+ * @param the segment's paths
+ * @param the status of the record, ignored if redirect is true
+ * @param whether we're doing a redirect as well
+ */
+ protected void createSegment(Path segment, byte status, boolean redirect) throws Exception {
+ if (redirect) {
+ createSegment(segment, status, false, true);
+ } else {
+ createSegment(segment, status, true, false);
+ }
+ }
+
+ protected void createSegment(Path segment, byte status, boolean fetch, boolean redirect) throws Exception {
+ System.out.println("\nSegment: " + segment.toString());
+
+ // The URL of our main record
+ String url = "http://nutch.apache.org/";
+
+ // The URL of our redirecting URL
+ String redirectUrl = "http://nutch.apache.org/i_redirect_to_the_root/";
+
+ // Our value
+ CrawlDatum value = new CrawlDatum();
+
+ // Path of the segment's crawl_fetch directory
+ Path crawlFetchPath = new Path(new Path(segment, CrawlDatum.FETCH_DIR_NAME), "part-00000");
+
+ // Get a writer for map files containing <Text,CrawlDatum> pairs
+ MapFile.Writer writer = new MapFile.Writer(conf, fs, crawlFetchPath.toString(), Text.class, CrawlDatum.class);
+
+ // Whether we're handling a redirect now
+ // first add the linked datum
+ // - before redirect status because url sorts before redirectUrl
+ // - before fetch status to check whether fetch datum is preferred over linked datum when merging
+ if (redirect) {
+ // We're writing our our main record URL with status linked
+ System.out.println(url + " > " + CrawlDatum.getStatusName(CrawlDatum.STATUS_LINKED));
+ value = new CrawlDatum();
+ value.setStatus(CrawlDatum.STATUS_LINKED);
+ writer.append(new Text(url), value);
+ }
+
+ // Whether we're fetching now
+ if (fetch) {
+ System.out.println(url + " > " + CrawlDatum.getStatusName(status));
+
+ // Set the status
+ value.setStatus(status);
+
+ // Write the pair and ok
+ writer.append(new Text(url), value);
+ }
+
+ // Whether we're handing a redirect now
+ if (redirect) {
+ // And the redirect URL with redirect status, pointing to our main URL
+ System.out.println(redirectUrl + " > " + CrawlDatum.getStatusName(CrawlDatum.STATUS_FETCH_REDIR_TEMP));
+ value.setStatus(CrawlDatum.STATUS_FETCH_REDIR_TEMP);
+ writer.append(new Text(redirectUrl), value);
+ }
+
+ // Close the stuff
+ writer.close();
+ }
+
+}