You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/09/06 19:13:27 UTC

svn commit: r1165763 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/io/RCFile.java test/org/apache/hadoop/hive/ql/io/TestRCFile.java

Author: heyongqiang
Date: Tue Sep  6 17:13:26 2011
New Revision: 1165763

URL: http://svn.apache.org/viewvc?rev=1165763&view=rev
Log:
HIVE-2404: Allow RCFile Reader to tolerate corruptions (Ramkumar Vadali via He Yongqiang)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1165763&r1=1165762&r2=1165763&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Tue Sep  6 17:13:26 2011
@@ -155,6 +155,9 @@ public class RCFile {
 
   public static final String COLUMN_NUMBER_CONF_STR = "hive.io.rcfile.column.number.conf";
 
+  public static final String TOLERATE_CORRUPTIONS_CONF_STR =
+    "hive.io.rcfile.tolerate.corruptions";
+
   /*
    * these header and Sync are kept from SequenceFile, for compatible of
    * SequenceFile's format.
@@ -351,6 +354,7 @@ public class RCFile {
     private boolean[] decompressedFlag = null;
     private int numCompressed;
     private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
+    private boolean lazyDecompress = true;
 
     boolean inited = false;
 
@@ -381,7 +385,13 @@ public class RCFile {
 
     public ValueBuffer(KeyBuffer currentKey, int columnNumber,
         boolean[] skippedCols, CompressionCodec codec) throws IOException {
+      this(currentKey, columnNumber, skippedCols, codec, true);
+    }
 
+    public ValueBuffer(KeyBuffer currentKey, int columnNumber,
+      boolean[] skippedCols, CompressionCodec codec, boolean lazyDecompress)
+        throws IOException {
+      this.lazyDecompress = lazyDecompress;
       keyBuffer = currentKey;
       this.columnNumber = columnNumber;
 
@@ -469,7 +479,12 @@ public class RCFile {
         valBuf.reset();
         valBuf.write(in, vaRowsLen);
         if (codec != null) {
-          decompressedFlag[addIndex] = false;
+          if (lazyDecompress) {
+            decompressedFlag[addIndex] = false;
+          } else {
+            lazyDecompressCallbackObjs[addIndex].decompress();
+            decompressedFlag[addIndex] = true;
+          }
         }
         addIndex++;
       }
@@ -1030,6 +1045,8 @@ public class RCFile {
 
     private int passedRowsNum = 0;
 
+    // Should we try to tolerate corruption? Default is No.
+    private boolean tolerateCorruptions = false;
 
     private boolean decompress = false;
 
@@ -1055,6 +1072,8 @@ public class RCFile {
     /** Create a new RCFile reader. */
     public Reader(FileSystem fs, Path file, int bufferSize, Configuration conf,
         long start, long length) throws IOException {
+      tolerateCorruptions = conf.getBoolean(
+        TOLERATE_CORRUPTIONS_CONF_STR, false);
       conf.setInt("io.file.buffer.size", bufferSize);
       this.file = file;
       in = openFile(fs, file, bufferSize, length);
@@ -1139,7 +1158,9 @@ public class RCFile {
       }
 
       currentKey = createKeyBuffer();
-      currentValue = new ValueBuffer(null, columnNumber, skippedColIDs, codec);
+      boolean lazyDecompress = !tolerateCorruptions;
+      currentValue = new ValueBuffer(
+        null, columnNumber, skippedColIDs, codec, lazyDecompress);
     }
 
     /**
@@ -1286,11 +1307,6 @@ public class RCFile {
       return new KeyBuffer(columnNumber);
     }
 
-    @SuppressWarnings("unused")
-    private ValueBuffer createValueBuffer(KeyBuffer key) throws IOException {
-      return new ValueBuffer(key, skippedColIDs);
-    }
-
     /**
      * Read and return the next record length, potentially skipping over a sync
      * block.
@@ -1501,10 +1517,14 @@ public class RCFile {
       }
 
       int ret = -1;
-      try {
-        ret = nextKeyBuffer();
-      } catch (EOFException eof) {
-        eof.printStackTrace();
+      if (tolerateCorruptions) {
+        ret = nextKeyValueTolerateCorruptions();
+      } else {
+        try {
+          ret = nextKeyBuffer();
+        } catch (EOFException eof) {
+          eof.printStackTrace();
+        }
       }
       if (ret > 0) {
         return next(readRows);
@@ -1512,6 +1532,35 @@ public class RCFile {
       return false;
     }
 
+    private int nextKeyValueTolerateCorruptions() throws IOException {
+      long currentOffset = in.getPos();
+      int ret = -1;
+      try {
+        ret = nextKeyBuffer();
+        this.currentValueBuffer();
+      } catch (EOFException eof) {
+        LOG.warn("Ignoring EOFException in file " + file +
+                 " after offset " + currentOffset, eof);
+        ret = -1;
+      } catch (ChecksumException ce) {
+        LOG.warn("Ignoring ChecksumException in file " + file +
+                 " after offset " + currentOffset, ce);
+        ret = -1;
+      } catch (IOException ioe) {
+        // We have an IOException other than EOF or ChecksumException
+        // This is likely a read-error, not corruption, re-throw.
+        throw ioe;
+      } catch (Throwable t) {
+        // We got an exception that is not IOException
+        // (typically OOM, IndexOutOfBounds, InternalError).
+        // This is most likely a corruption.
+        LOG.warn("Ignoring unknown error in " + file +
+                 " after offset " + currentOffset, t);
+        ret = -1;
+      }
+      return ret;
+    }
+
     public boolean hasRecordsInBuffer() {
       return readRowsIndexInBuffer < recordsNumInValBuffer;
     }
@@ -1528,11 +1577,18 @@ public class RCFile {
         return;
       }
 
-      if (!currentValue.inited) {
-        currentValueBuffer();
-        // do this only when not initialized, but we may need to find a way to
-        // tell the caller how to initialize the valid size
+      if (tolerateCorruptions) {
+        if (!currentValue.inited) {
+          currentValueBuffer();
+        }
         ret.resetValid(columnNumber);
+      } else {
+        if (!currentValue.inited) {
+          currentValueBuffer();
+          // do this only when not initialized, but we may need to find a way to
+          // tell the caller how to initialize the valid size
+          ret.resetValid(columnNumber);
+        }
       }
 
       // we do not use BytesWritable here to avoid the byte-copy from

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java?rev=1165763&r1=1165762&r2=1165763&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java Tue Sep  6 17:13:26 2011
@@ -19,10 +19,12 @@
 package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.io.InputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Properties;
+import java.util.Random;
 
 import junit.framework.TestCase;
 
@@ -207,6 +209,59 @@ public class TestRCFile extends TestCase
     reader.close();
   }
 
+  public void testReadCorruptFile() throws IOException, SerDeException {
+    fs.delete(file, true);
+
+    byte[][] record = {null, null, null, null, null, null, null, null};
+
+    RCFileOutputFormat.setColumnNumber(conf, expectedFieldsData.length);
+    RCFile.Writer writer = new RCFile.Writer(fs, conf, file, null,
+        new DefaultCodec());
+    BytesRefArrayWritable bytes = new BytesRefArrayWritable(record.length);
+    final int recCount = 100;
+    Random rand = new Random();
+    for (int recIdx = 0; recIdx < recCount; recIdx++) {
+      for (int i = 0; i < record.length; i++) {
+        record[i] = new Integer(rand.nextInt()).toString().getBytes("UTF-8");
+      }
+      for (int i = 0; i < record.length; i++) {
+        BytesRefWritable cu = new BytesRefWritable(record[i], 0,
+            record[i].length);
+        bytes.set(i, cu);
+      }
+      writer.append(bytes);
+      bytes.clear();
+    }
+    writer.close();
+
+    // Insert junk in middle of file. Assumes file is on local disk.
+    RandomAccessFile raf = new RandomAccessFile(file.toUri().getPath(), "rw");
+    long corruptOffset = raf.length() / 2;
+    LOG.info("corrupting " + raf + " at offset " + corruptOffset);
+    raf.seek(corruptOffset);
+    raf.writeBytes("junkjunkjunkjunkjunkjunkjunkjunk");
+    raf.close();
+
+    // Set the option for tolerating corruptions. The read should succeed.
+    Configuration tmpConf = new Configuration(conf);
+    tmpConf.setBoolean("hive.io.rcfile.tolerate.corruptions", true);
+    RCFile.Reader reader = new RCFile.Reader(fs, file, tmpConf);
+
+    LongWritable rowID = new LongWritable();
+
+    while (true) {
+      boolean more = reader.next(rowID);
+      if (!more) {
+        break;
+      }
+      BytesRefArrayWritable cols = new BytesRefArrayWritable();
+      reader.getCurrentRow(cols);
+      cols.resetValid(8);
+    }
+
+    reader.close();
+  }
+
   public void testWriteAndFullyRead() throws IOException, SerDeException {
     writeTest(fs, 10000, file, bytesArray);
     fullyReadTest(fs, 10000, file);