You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/09/06 19:13:27 UTC
svn commit: r1165763 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/io/RCFile.java
test/org/apache/hadoop/hive/ql/io/TestRCFile.java
Author: heyongqiang
Date: Tue Sep 6 17:13:26 2011
New Revision: 1165763
URL: http://svn.apache.org/viewvc?rev=1165763&view=rev
Log:
HIVE-2404: Allow RCFile Reader to tolerate corruptions (Ramkumar Vadali via He Yongqiang)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1165763&r1=1165762&r2=1165763&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Sep 6 17:13:26 2011
@@ -155,6 +155,9 @@ public class RCFile {
public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf";
+ public static final String TOLERATE_CORRUPTIONS_CONF_STR =
+ "hive.io.rcfile.tolerate.corruptions";
+
/*
* these header and Sync are kept from SequenceFile, for compatible of
* SequenceFile's format.
@@ -351,6 +354,7 @@ public class RCFile {
private boolean[] decompressedFlag = null;
private int numCompressed;
private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
+ private boolean lazyDecompress = true;
boolean inited = false;
@@ -381,7 +385,13 @@ public class RCFile {
public ValueBuffer(KeyBuffer currentKey, int columnNumber,
boolean[] skippedCols, CompressionCodec codec) throws IOException {
+ this(currentKey, columnNumber, skippedCols, codec, true);
+ }
+ public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+ boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress)
+ throws IOException {
+ this.lazyDecompress = lazyDecompress;
keyBuffer = currentKey;
this.columnNumber = columnNumber;
@@ -469,7 +479,12 @@ public class RCFile {
valBuf.reset();
valBuf.write(in, vaRowsLen);
if (codec != null) {
- decompressedFlag[addIndex] = false;
+ if (lazyDecompress) {
+ decompressedFlag[addIndex] = false;
+ } else {
+ lazyDecompressCallbackObjs[addIndex].decompress();
+ decompressedFlag[addIndex] = true;
+ }
}
addIndex++;
}
@@ -1030,6 +1045,8 @@ public class RCFile {
private int passedRowsNum = 0;
+ // Should we try to tolerate corruption? Default is No.
+ private boolean tolerateCorruptions = false;
private boolean decompress = false;
@@ -1055,6 +1072,8 @@ public class RCFile {
/** Create a new RCFile reader. */
public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
long start, long length) throws IOException {
+ tolerateCorruptions = conf.getBoolean(
+ TOLERATE_CORRUPTIONS_CONF_STR, false);
conf.setInt("io.file.buffer.size", bufferSize);
this.file = file;
in = openFile(fs, file, bufferSize, length);
@@ -1139,7 +1158,9 @@ public class RCFile {
}
currentKey = createKeyBuffer();
- currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
+ boolean lazyDecompress = !tolerateCorruptions;
+ currentValue = new ValueBuffer(
+ null, columnNumber, skippedColIDs, codec, lazyDecompress);
}
/**
@@ -1286,11 +1307,6 @@ public class RCFile {
return new KeyBuffer(columnNumber);
}
- @SuppressWarnings("unused")
- private ValueBuffer createValueBuffer(KeyBuffer key) throws IOException {
- return new ValueBuffer(key, skippedColIDs);
- }
-
/**
* Read and return the next record length, potentially skipping over a sync
* block.
@@ -1501,10 +1517,14 @@ public class RCFile {
}
int ret = -1;
- try {
- ret = nextKeyBuffer();
- } catch (EOFException eof) {
- eof.printStackTrace();
+ if (tolerateCorruptions) {
+ ret = nextKeyValueTolerateCorruptions();
+ } else {
+ try {
+ ret = nextKeyBuffer();
+ } catch (EOFException eof) {
+ eof.printStackTrace();
+ }
}
if (ret > 0) {
return next(readRows);
@@ -1512,6 +1532,35 @@ public class RCFile {
return false;
}
+ private int nextKeyValueTolerateCorruptions() throws IOException {
+ long currentOffset = in.getPos();
+ int ret = -1;
+ try {
+ ret = nextKeyBuffer();
+ this.currentValueBuffer();
+ } catch (EOFException eof) {
+ LOG.warn("Ignoring EOFException in file " + file +
+ " after offset " + currentOffset, eof);
+ ret = -1;
+ } catch (ChecksumException ce) {
+ LOG.warn("Ignoring ChecksumException in file " + file +
+ " after offset " + currentOffset, ce);
+ ret = -1;
+ } catch (IOException ioe) {
+ // We have an IOException other than EOF or ChecksumException
+ // This is likely a read-error, not corruption, re-throw.
+ throw ioe;
+ } catch (Throwable t) {
+ // We got an exception that is not IOException
+ // (typically OOM, IndexOutOfBounds, InternalError).
+ // This is most likely a corruption.
+ LOG.warn("Ignoring unknown error in " + file +
+ " after offset " + currentOffset, t);
+ ret = -1;
+ }
+ return ret;
+ }
+
public boolean hasRecordsInBuffer() {
return readRowsIndexInBuffer < recordsNumInValBuffer;
}
@@ -1528,11 +1577,18 @@ public class RCFile {
return;
}
- if (!currentValue.inited) {
- currentValueBuffer();
- // do this only when not initialized, but we may need to find a way to
- // tell the caller how to initialize the valid size
+ if (tolerateCorruptions) {
+ if (!currentValue.inited) {
+ currentValueBuffer();
+ }
ret.resetValid(columnNumber);
+ } else {
+ if (!currentValue.inited) {
+ currentValueBuffer();
+ // do this only when not initialized, but we may need to find a way to
+ // tell the caller how to initialize the valid size
+ ret.resetValid(columnNumber);
+ }
}
// we do not use BytesWritable here to avoid the byte-copy from
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=1165763&r1=1165762&r2=1165763&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Tue Sep 6 17:13:26 2011
@@ -19,10 +19,12 @@
package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
+import java.util.Random;
import junit.framework.TestCase;
@@ -207,6 +209,59 @@ public class TestRCFile extends TestCase
reader.close();
}
+ public void testReadCorruptFile() throws IOException, SerDeException {
+ fs.delete(file, true);
+
+ byte[][] record = {null, null, null, null, null, null, null, null};
+
+ RCFileOutputFormat.setColumnNumber(conf, expectedFieldsData.length);
+ RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+ new DefaultCodec());
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);
+ final int recCount = 100;
+ Random rand = new Random();
+ for (int recIdx = 0; recIdx < recCount; recIdx++) {
+ for (int i = 0; i < record.length; i++) {
+ record[i] = new Integer(rand.nextInt()).toString().getBytes("UTF-8");
+ }
+ for (int i = 0; i < record.length; i++) {
+ BytesRefWritable cu = new BytesRefWritable(record[i], 0,
+ record[i].length);
+ bytes.set(i, cu);
+ }
+ writer.append(bytes);
+ bytes.clear();
+ }
+ writer.close();
+
+ // Insert junk in middle of file. Assumes file is on local disk.
+ RandomAccessFile raf = new RandomAccessFile(file.toUri().getPath(), "rw");
+ long corruptOffset = raf.length() / 2;
+ LOG.info("corrupting " + raf + " at offset " + corruptOffset);
+ raf.seek(corruptOffset);
+ raf.writeBytes("junkjunkjunkjunkjunkjunkjunkjunk");
+ raf.close();
+
+ // Set the option for tolerating corruptions. The read should succeed.
+ Configuration tmpConf = new Configuration(conf);
+ tmpConf.setBoolean("hive.io.rcfile.tolerate.corruptions", true);
+ RCFile.Reader reader = new RCFile.Reader(fs, file, tmpConf);
+
+ LongWritable rowID = new LongWritable();
+
+ while (true) {
+ boolean more = reader.next(rowID);
+ if (!more) {
+ break;
+ }
+ BytesRefArrayWritable cols = new BytesRefArrayWritable();
+ reader.getCurrentRow(cols);
+ cols.resetValid(8);
+ }
+
+ reader.close();
+ }
+
public void testWriteAndFullyRead() throws IOException, SerDeException {
writeTest(fs, 10000, file, bytesArray);
fullyReadTest(fs, 10000, file);