You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/11/15 19:00:16 UTC

svn commit: r344403 - in /lucene/nutch/branches/mapred: conf/nutch-default.xml src/java/org/apache/nutch/fs/ChecksumException.java src/java/org/apache/nutch/fs/NFSDataInputStream.java src/java/org/apache/nutch/io/SequenceFile.java

Author: cutting
Date: Tue Nov 15 10:00:14 2005
New Revision: 344403

URL: http://svn.apache.org/viewcvs?rev=344403&view=rev
Log:
Add ability to skip over data with bad checksums.

Added:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java
Modified:
    lucene/nutch/branches/mapred/conf/nutch-default.xml
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java

Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=344403&r1=344402&r2=344403&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/conf/nutch-default.xml (original)
+++ lucene/nutch/branches/mapred/conf/nutch-default.xml Tue Nov 15 10:00:14 2005
@@ -339,6 +339,21 @@
   buffered during read and write operations.</description>
 </property>
   
+<property>
+  <name>io.bytes.per.checksum</name>
+  <value>512</value>
+  <description>The number of bytes per checksum.  Must not be larger than
+  io.file.buffer.size.</description>
+</property>
+
+<property>
+  <name>io.skip.checksum.errors</name>
+  <value>false</value>
+  <description>If true, when a checksum error is encountered while
+  reading a sequence file, entries are skipped, instead of throwing an
+  exception.</description>
+</property>
+  
 <!-- file system properties -->
 
 <property>

Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java?rev=344403&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java Tue Nov 15 10:00:14 2005
@@ -0,0 +1,26 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fs;
+
+import java.io.IOException;
+
+/** Thrown for checksum errors. */
+public class ChecksumException extends IOException {
+  public ChecksumException(String description) {
+    super(description);
+  }
+}

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java?rev=344403&r1=344402&r2=344403&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java Tue Nov 15 10:00:14 2005
@@ -109,13 +109,15 @@
         stopSumming();
         return;
       }
-      if (crc != (int)sum.getValue()) {
-        fs.reportChecksumFailure(file, (NFSInputStream)in,
-                                 getPos()-delta, bytesPerSum, crc);
-        throw new IOException("Checksum error: "+file);
-      }
+      int sumValue = (int)sum.getValue();
       sum.reset();
       inSum = 0;
+      if (crc != sumValue) {
+        long pos = getPos() - delta;
+        fs.reportChecksumFailure(file, (NFSInputStream)in,
+                                 pos, bytesPerSum, crc);
+        throw new ChecksumException("Checksum error: "+file+" at "+pos);
+      }
     }
 
     public long getPos() throws IOException {

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=344403&r1=344402&r2=344403&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Tue Nov 15 10:00:14 2005
@@ -359,23 +359,39 @@
       if (in.getPos() >= end)
         return -1;
 
-      int length = in.readInt();
+      try {
+        int length = in.readInt();
 
-      if (version[3] > 1 && sync != null &&
-          length == SYNC_ESCAPE) {                // process a sync entry
-        //LOG.info("sync@"+in.getPos());
-        in.readFully(syncCheck);                  // read syncCheck
-        if (!Arrays.equals(sync, syncCheck))      // check it
-          throw new IOException("File is corrupt!");
-        syncSeen = true;
-        length = in.readInt();                    // re-read length
-      } else {
-        syncSeen = false;
+        if (version[3] > 1 && sync != null &&
+            length == SYNC_ESCAPE) {              // process a sync entry
+          //LOG.info("sync@"+in.getPos());
+          in.readFully(syncCheck);                // read syncCheck
+          if (!Arrays.equals(sync, syncCheck))    // check it
+            throw new IOException("File is corrupt!");
+          syncSeen = true;
+          length = in.readInt();                  // re-read length
+        } else {
+          syncSeen = false;
+        }
+        
+        int keyLength = in.readInt();
+        buffer.write(in, length);
+        return keyLength;
+
+      } catch (ChecksumException e) {             // checksum failure
+        handleChecksumException(e);
+        return next(buffer);
       }
+    }
 
-      int keyLength = in.readInt();
-      buffer.write(in, length);
-      return keyLength;
+    private void handleChecksumException(ChecksumException e)
+      throws IOException {
+      if (NutchConf.get().getBoolean("io.skip.checksum.errors", false)) {
+        LOG.warning("Bad checksum at "+getPosition()+". Skipping entries.");
+        sync(getPosition()+NutchConf.get().getInt("io.bytes.per.checksum", 512));
+      } else {
+        throw e;
+      }
     }
 
     /** Set the current byte position in the input file. */
@@ -390,20 +406,24 @@
         return;
       }
 
-      seek(position+4);                           // skip escape
-      in.readFully(syncCheck);
-      int syncLen = sync.length;
-      for (int i = 0; in.getPos() < end; i++) {
-        int j = 0;
-        for (; j < syncLen; j++) {
-          if (sync[j] != syncCheck[(i+j)%syncLen])
-            break;
-        }
-        if (j == syncLen) {
-          in.seek(in.getPos() - SYNC_SIZE);  // position before sync
-          return;
+      try {
+        seek(position+4);                         // skip escape
+        in.readFully(syncCheck);
+        int syncLen = sync.length;
+        for (int i = 0; in.getPos() < end; i++) {
+          int j = 0;
+          for (; j < syncLen; j++) {
+            if (sync[j] != syncCheck[(i+j)%syncLen])
+              break;
+          }
+          if (j == syncLen) {
+            in.seek(in.getPos() - SYNC_SIZE);     // position before sync
+            return;
+          }
+          syncCheck[i%syncLen] = in.readByte();
         }
-        syncCheck[i%syncLen] = in.readByte();
+      } catch (ChecksumException e) {             // checksum failure
+        handleChecksumException(e);
       }
     }