You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/04/08 20:37:41 UTC
svn commit: r1585812 - in /hive/branches/branch-0.13: ./
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/test/org/apache/hadoop/hive/ql/io/orc/
Author: omalley
Date: Tue Apr 8 18:37:41 2014
New Revision: 1585812
URL: http://svn.apache.org/r1585812
Log:
HIVE-6759. Don't trust file lengths from HDFS when ORC files are being written.
(omalley)
Modified:
hive/branches/branch-0.13/ (props changed)
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
Propchange: hive/branches/branch-0.13/
------------------------------------------------------------------------------
Merged /hive/trunk:r1585811
Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1585812&r1=1585811&r2=1585812&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Tue Apr 8 18:37:41 2014
@@ -315,8 +315,15 @@ final class ReaderImpl implements Reader
) throws IOException {
FSDataInputStream file = fs.open(path);
+ // figure out the size of the file using the option or filesystem
+ long size;
+ if (maxFileLength == Long.MAX_VALUE) {
+ size = fs.getFileStatus(path).getLen();
+ } else {
+ size = maxFileLength;
+ }
+
//read last bytes into buffer to get PostScript
- long size = Math.min(maxFileLength, fs.getFileStatus(path).getLen());
int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
file.seek(size - readSize);
ByteBuffer buffer = ByteBuffer.allocate(readSize);
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1585812&r1=1585811&r2=1585812&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java Tue Apr 8 18:37:41 2014
@@ -134,7 +134,7 @@ public class TestOrcRawRecordMerger {
}
// can add .verboseLogging() to cause Mockito to log invocations
- private final MockSettings settings = Mockito.withSettings().verboseLogging();
+ private final MockSettings settings = Mockito.withSettings();
private Reader createMockReader() throws IOException {
Reader reader = Mockito.mock(Reader.class, settings);
Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1585812&r1=1585811&r2=1585812&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java (original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java Tue Apr 8 18:37:41 2014
@@ -23,19 +23,139 @@ import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.io.sarg.TestSearchArgumentImpl;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.Test;
+import org.mockito.MockSettings;
+import org.mockito.Mockito;
public class TestRecordReaderImpl {
+ // can add .verboseLogging() to cause Mockito to log invocations
+ private final MockSettings settings = Mockito.withSettings().verboseLogging();
+
+ static class BufferInStream
+ extends InputStream implements PositionedReadable, Seekable {
+ private final byte[] buffer;
+ private final int length;
+ private int position = 0;
+
+ BufferInStream(byte[] bytes, int length) {
+ this.buffer = bytes;
+ this.length = length;
+ }
+
+ @Override
+ public int read() {
+ if (position < length) {
+ return buffer[position++];
+ }
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] bytes, int offset, int length) {
+ int lengthToRead = Math.min(length, this.length - this.position);
+ if (lengthToRead >= 0) {
+ for(int i=0; i < lengthToRead; ++i) {
+ bytes[offset + i] = buffer[position++];
+ }
+ return lengthToRead;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int read(long position, byte[] bytes, int offset, int length) {
+ this.position = (int) position;
+ return read(bytes, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] bytes, int offset,
+ int length) throws IOException {
+ this.position = (int) position;
+ while (length > 0) {
+ int result = read(bytes, offset, length);
+ offset += result;
+ length -= result;
+ if (result < 0) {
+ throw new IOException("Read past end of buffer at " + offset);
+ }
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] bytes) throws IOException {
+ readFully(position, bytes, 0, bytes.length);
+ }
+
+ @Override
+ public void seek(long position) {
+ this.position = (int) position;
+ }
+
+ @Override
+ public long getPos() {
+ return position;
+ }
+
+ @Override
+ public boolean seekToNewSource(long position) throws IOException {
+ this.position = (int) position;
+ return false;
+ }
+ }
+
+ @Test
+ public void testMaxLengthToReader() throws Exception {
+ Configuration conf = new Configuration();
+ OrcProto.Type rowType = OrcProto.Type.newBuilder()
+ .setKind(OrcProto.Type.Kind.STRUCT).build();
+ OrcProto.Footer footer = OrcProto.Footer.newBuilder()
+ .setHeaderLength(0).setContentLength(0).setNumberOfRows(0)
+ .setRowIndexStride(0).addTypes(rowType).build();
+ OrcProto.PostScript ps = OrcProto.PostScript.newBuilder()
+ .setCompression(OrcProto.CompressionKind.NONE)
+ .setFooterLength(footer.getSerializedSize())
+ .setMagic("ORC").addVersion(0).addVersion(11).build();
+ DataOutputBuffer buffer = new DataOutputBuffer();
+ footer.writeTo(buffer);
+ ps.writeTo(buffer);
+ buffer.write(ps.getSerializedSize());
+ FileSystem fs = Mockito.mock(FileSystem.class, settings);
+ FSDataInputStream file =
+ new FSDataInputStream(new BufferInStream(buffer.getData(),
+ buffer.getLength()));
+ Path p = new Path("/dir/file.orc");
+ Mockito.when(fs.open(p)).thenReturn(file);
+ OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+ options.filesystem(fs);
+ options.maxLength(buffer.getLength());
+ Mockito.when(fs.getFileStatus(p))
+ .thenReturn(new FileStatus(10, false, 3, 3000, 0, p));
+ Reader reader = OrcFile.createReader(p, options);
+ }
+
@Test
public void testCompareToRangeInt() throws Exception {
assertEquals(Location.BEFORE,
@@ -671,7 +791,7 @@ public class TestRecordReaderImpl {
rowGroups = null;
columns = new boolean[]{true, false, true};
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, false, encodings, types, 32768);
+ columns, null, false, encodings, types, 32768);
assertThat(result, is(diskRanges(100000, 102000, 102000, 200000)));
rowGroups = new boolean[]{false, true, false, false, false, false};