You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/11/15 19:00:16 UTC
svn commit: r344403 - in /lucene/nutch/branches/mapred:
conf/nutch-default.xml src/java/org/apache/nutch/fs/ChecksumException.java
src/java/org/apache/nutch/fs/NFSDataInputStream.java
src/java/org/apache/nutch/io/SequenceFile.java
Author: cutting
Date: Tue Nov 15 10:00:14 2005
New Revision: 344403
URL: http://svn.apache.org/viewcvs?rev=344403&view=rev
Log:
Add ability to skip over data with bad checksums.
Added:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java
Modified:
lucene/nutch/branches/mapred/conf/nutch-default.xml
lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
Modified: lucene/nutch/branches/mapred/conf/nutch-default.xml
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/conf/nutch-default.xml?rev=344403&r1=344402&r2=344403&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/conf/nutch-default.xml (original)
+++ lucene/nutch/branches/mapred/conf/nutch-default.xml Tue Nov 15 10:00:14 2005
@@ -339,6 +339,21 @@
buffered during read and write operations.</description>
</property>
+<property>
+ <name>io.bytes.per.checksum</name>
+ <value>512</value>
+ <description>The number of bytes per checksum. Must not be larger than
+ io.file.buffer.size.</description>
+</property>
+
+<property>
+ <name>io.skip.checksum.errors</name>
+ <value>false</value>
+ <description>If true, when a checksum error is encountered while
+ reading a sequence file, entries are skipped, instead of throwing an
+ exception.</description>
+</property>
+
<!-- file system properties -->
<property>
Added: lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java?rev=344403&view=auto
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java (added)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/ChecksumException.java Tue Nov 15 10:00:14 2005
@@ -0,0 +1,26 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fs;
+
+import java.io.IOException;
+
+/** Thrown for checksum errors. */
+public class ChecksumException extends IOException {
+ public ChecksumException(String description) {
+ super(description);
+ }
+}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java?rev=344403&r1=344402&r2=344403&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/fs/NFSDataInputStream.java Tue Nov 15 10:00:14 2005
@@ -109,13 +109,15 @@
stopSumming();
return;
}
- if (crc != (int)sum.getValue()) {
- fs.reportChecksumFailure(file, (NFSInputStream)in,
- getPos()-delta, bytesPerSum, crc);
- throw new IOException("Checksum error: "+file);
- }
+ int sumValue = (int)sum.getValue();
sum.reset();
inSum = 0;
+ if (crc != sumValue) {
+ long pos = getPos() - delta;
+ fs.reportChecksumFailure(file, (NFSInputStream)in,
+ pos, bytesPerSum, crc);
+ throw new ChecksumException("Checksum error: "+file+" at "+pos);
+ }
}
public long getPos() throws IOException {
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java?rev=344403&r1=344402&r2=344403&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/io/SequenceFile.java Tue Nov 15 10:00:14 2005
@@ -359,23 +359,39 @@
if (in.getPos() >= end)
return -1;
- int length = in.readInt();
+ try {
+ int length = in.readInt();
- if (version[3] > 1 && sync != null &&
- length == SYNC_ESCAPE) { // process a sync entry
- //LOG.info("sync@"+in.getPos());
- in.readFully(syncCheck); // read syncCheck
- if (!Arrays.equals(sync, syncCheck)) // check it
- throw new IOException("File is corrupt!");
- syncSeen = true;
- length = in.readInt(); // re-read length
- } else {
- syncSeen = false;
+ if (version[3] > 1 && sync != null &&
+ length == SYNC_ESCAPE) { // process a sync entry
+ //LOG.info("sync@"+in.getPos());
+ in.readFully(syncCheck); // read syncCheck
+ if (!Arrays.equals(sync, syncCheck)) // check it
+ throw new IOException("File is corrupt!");
+ syncSeen = true;
+ length = in.readInt(); // re-read length
+ } else {
+ syncSeen = false;
+ }
+
+ int keyLength = in.readInt();
+ buffer.write(in, length);
+ return keyLength;
+
+ } catch (ChecksumException e) { // checksum failure
+ handleChecksumException(e);
+ return next(buffer);
}
+ }
- int keyLength = in.readInt();
- buffer.write(in, length);
- return keyLength;
+ private void handleChecksumException(ChecksumException e)
+ throws IOException {
+ if (NutchConf.get().getBoolean("io.skip.checksum.errors", false)) {
+ LOG.warning("Bad checksum at "+getPosition()+". Skipping entries.");
+ sync(getPosition()+NutchConf.get().getInt("io.bytes.per.checksum", 512));
+ } else {
+ throw e;
+ }
}
/** Set the current byte position in the input file. */
@@ -390,20 +406,24 @@
return;
}
- seek(position+4); // skip escape
- in.readFully(syncCheck);
- int syncLen = sync.length;
- for (int i = 0; in.getPos() < end; i++) {
- int j = 0;
- for (; j < syncLen; j++) {
- if (sync[j] != syncCheck[(i+j)%syncLen])
- break;
- }
- if (j == syncLen) {
- in.seek(in.getPos() - SYNC_SIZE); // position before sync
- return;
+ try {
+ seek(position+4); // skip escape
+ in.readFully(syncCheck);
+ int syncLen = sync.length;
+ for (int i = 0; in.getPos() < end; i++) {
+ int j = 0;
+ for (; j < syncLen; j++) {
+ if (sync[j] != syncCheck[(i+j)%syncLen])
+ break;
+ }
+ if (j == syncLen) {
+ in.seek(in.getPos() - SYNC_SIZE); // position before sync
+ return;
+ }
+ syncCheck[i%syncLen] = in.readByte();
}
- syncCheck[i%syncLen] = in.readByte();
+ } catch (ChecksumException e) { // checksum failure
+ handleChecksumException(e);
}
}