You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/15 21:05:36 UTC
svn commit: r1514438 [1/3] - in /hive/trunk:
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/java/org/apache/hadoop/hive/ql/io/sarg/
ql/src/java/org/apache/hadoo...
Author: gunther
Date: Thu Aug 15 19:05:35 2013
New Revision: 1514438
URL: http://svn.apache.org/r1514438
Log:
HIVE-4246: Implement predicate pushdown for ORC (Owen O'Malley via Gunther Hagleitner)
Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRunLengthByteReader.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRunLengthIntegerReader.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Aug 15 19:05:35 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -241,6 +242,7 @@ public class TableScanOperator extends O
// and 2) it will fail some join and union queries if this is added forcibly
// into tableScanDesc
java.util.ArrayList<Integer> neededColumnIDs;
+ List<String> neededColumns;
public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
neededColumnIDs = orign_columns;
@@ -250,6 +252,14 @@ public class TableScanOperator extends O
return neededColumnIDs;
}
+ public void setNeededColumns(List<String> columnNames) {
+ neededColumns = columnNames;
+ }
+
+ public List<String> getNeededColumns() {
+ return neededColumns;
+ }
+
@Override
public OperatorType getType() {
return OperatorType.TABLESCAN;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Aug 15 19:05:35 2013
@@ -428,6 +428,8 @@ public class HiveInputFormat<K extends W
} else {
ColumnProjectionUtils.setFullyReadColumns(jobConf);
}
+ ColumnProjectionUtils.appendReadColumnNames(jobConf,
+ tableScan.getNeededColumns());
pushFilters(jobConf, tableScan);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Thu Aug 15 19:05:35 2013
@@ -39,7 +39,7 @@ class BitFieldReader {
current = 0xff & input.next();
bitsLeft = 8;
} else {
- throw new EOFException("Read past end of bit field from " + input);
+ throw new EOFException("Read past end of bit field from " + this);
}
}
@@ -85,4 +85,10 @@ class BitFieldReader {
bitsLeft = (int) (8 - (totalBits % 8));
}
}
+
+ @Override
+ public String toString() {
+ return "bit reader current: " + current + " bits left: " + bitsLeft +
+ " bit size: " + bitSize + " from " + input;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Aug 15 19:05:35 2013
@@ -25,97 +25,149 @@ abstract class InStream extends InputStr
private static class UncompressedStream extends InStream {
private final String name;
- private byte[] array;
- private int offset;
- private final int base;
- private final int limit;
+ private final ByteBuffer[] bytes;
+ private final long[] offsets;
+ private final long length;
+ private long currentOffset;
+ private byte[] range;
+ private int currentRange;
+ private int offsetInRange;
+ private int limitInRange;
- public UncompressedStream(String name, ByteBuffer input) {
+ public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
+ long length) {
this.name = name;
- this.array = input.array();
- base = input.arrayOffset() + input.position();
- offset = base;
- limit = input.arrayOffset() + input.limit();
+ this.bytes = input;
+ this.offsets = offsets;
+ this.length = length;
+ currentRange = 0;
+ offsetInRange = 0;
+ limitInRange = 0;
+ currentOffset = 0;
}
@Override
public int read() {
- if (offset == limit) {
- return -1;
+ if (offsetInRange >= limitInRange) {
+ if (currentOffset == length) {
+ return -1;
+ }
+ seek(currentOffset);
}
- return 0xff & array[offset++];
+ currentOffset += 1;
+ return 0xff & range[offsetInRange++];
}
@Override
public int read(byte[] data, int offset, int length) {
- if (this.offset == limit) {
- return -1;
+ if (offsetInRange >= limitInRange) {
+ if (currentOffset == this.length) {
+ return -1;
+ }
+ seek(currentOffset);
}
- int actualLength = Math.min(length, limit - this.offset);
- System.arraycopy(array, this.offset, data, offset, actualLength);
- this.offset += actualLength;
+ int actualLength = Math.min(length, limitInRange - offsetInRange);
+ System.arraycopy(range, offsetInRange, data, offset, actualLength);
+ offsetInRange += actualLength;
+ currentOffset += actualLength;
return actualLength;
}
@Override
public int available() {
- return limit - offset;
+ if (offsetInRange < limitInRange) {
+ return limitInRange - offsetInRange;
+ }
+ return (int) (length - currentOffset);
}
@Override
public void close() {
- array = null;
- offset = 0;
+ currentRange = bytes.length;
+ currentOffset = length;
}
@Override
public void seek(PositionProvider index) throws IOException {
- offset = base + (int) index.getNext();
+ seek(index.getNext());
+ }
+
+ public void seek(long desired) {
+ for(int i = 0; i < bytes.length; ++i) {
+ if (offsets[i] <= desired &&
+ desired - offsets[i] < bytes[i].remaining()) {
+ currentOffset = desired;
+ currentRange = i;
+ this.range = bytes[i].array();
+ offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
+ limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
+ offsetInRange += desired - offsets[i];
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Seek in " + name + " to " +
+ desired + " is outside of the data");
}
@Override
public String toString() {
- return "uncompressed stream " + name + " base: " + base +
- " offset: " + offset + " limit: " + limit;
+ return "uncompressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + offsetInRange + " limit: " + limitInRange;
}
}
private static class CompressedStream extends InStream {
private final String name;
- private byte[] array;
+ private final ByteBuffer[] bytes;
+ private final long[] offsets;
private final int bufferSize;
+ private final long length;
private ByteBuffer uncompressed = null;
private final CompressionCodec codec;
- private int offset;
- private final int base;
- private final int limit;
+ private byte[] compressed = null;
+ private long currentOffset;
+ private int currentRange;
+ private int offsetInCompressed;
+ private int limitInCompressed;
private boolean isUncompressedOriginal;
- public CompressedStream(String name, ByteBuffer input,
+ public CompressedStream(String name, ByteBuffer[] input,
+ long[] offsets, long length,
CompressionCodec codec, int bufferSize
) {
- this.array = input.array();
+ this.bytes = input;
this.name = name;
this.codec = codec;
+ this.length = length;
+ this.offsets = offsets;
this.bufferSize = bufferSize;
- base = input.arrayOffset() + input.position();
- offset = base;
- limit = input.arrayOffset() + input.limit();
+ currentOffset = 0;
+ currentRange = 0;
+ offsetInCompressed = 0;
+ limitInCompressed = 0;
}
private void readHeader() throws IOException {
- if (limit - offset > OutStream.HEADER_SIZE) {
- int chunkLength = ((0xff & array[offset + 2]) << 15) |
- ((0xff & array[offset + 1]) << 7) | ((0xff & array[offset]) >> 1);
+ if (compressed == null || offsetInCompressed >= limitInCompressed) {
+ seek(currentOffset);
+ }
+ if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
+ int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
+ ((0xff & compressed[offsetInCompressed + 1]) << 7) |
+ ((0xff & compressed[offsetInCompressed]) >> 1);
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
bufferSize + " needed = " + chunkLength);
}
- boolean isOriginal = (array[offset] & 0x01) == 1;
- offset += OutStream.HEADER_SIZE;
+ boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
+ offsetInCompressed += OutStream.HEADER_SIZE;
if (isOriginal) {
isUncompressedOriginal = true;
- uncompressed = ByteBuffer.wrap(array, offset, chunkLength);
+ uncompressed = bytes[currentRange].duplicate();
+ uncompressed.position(offsetInCompressed -
+ bytes[currentRange].arrayOffset());
+ uncompressed.limit(offsetInCompressed + chunkLength);
} else {
if (isUncompressedOriginal) {
uncompressed = ByteBuffer.allocate(bufferSize);
@@ -125,19 +177,21 @@ abstract class InStream extends InputStr
} else {
uncompressed.clear();
}
- codec.decompress(ByteBuffer.wrap(array, offset, chunkLength),
+ codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
+ chunkLength),
uncompressed);
}
- offset += chunkLength;
+ offsetInCompressed += chunkLength;
+ currentOffset += chunkLength + OutStream.HEADER_SIZE;
} else {
- throw new IllegalStateException("Can't read header");
+ throw new IllegalStateException("Can't read header at " + this);
}
}
@Override
public int read() throws IOException {
if (uncompressed == null || uncompressed.remaining() == 0) {
- if (offset == limit) {
+ if (currentOffset == length) {
return -1;
}
readHeader();
@@ -148,7 +202,7 @@ abstract class InStream extends InputStr
@Override
public int read(byte[] data, int offset, int length) throws IOException {
if (uncompressed == null || uncompressed.remaining() == 0) {
- if (this.offset == this.limit) {
+ if (currentOffset == this.length) {
return -1;
}
readHeader();
@@ -164,7 +218,7 @@ abstract class InStream extends InputStr
@Override
public int available() throws IOException {
if (uncompressed == null || uncompressed.remaining() == 0) {
- if (offset == limit) {
+ if (currentOffset == length) {
return 0;
}
readHeader();
@@ -174,27 +228,74 @@ abstract class InStream extends InputStr
@Override
public void close() {
- array = null;
uncompressed = null;
- offset = 0;
+ currentRange = bytes.length;
+ offsetInCompressed = 0;
+ limitInCompressed = 0;
+ currentOffset = length;
}
@Override
public void seek(PositionProvider index) throws IOException {
- offset = base + (int) index.getNext();
- int uncompBytes = (int) index.getNext();
- if (uncompBytes != 0) {
+ seek(index.getNext());
+ long uncompressedBytes = index.getNext();
+ if (uncompressedBytes != 0) {
readHeader();
- uncompressed.position(uncompressed.position() + uncompBytes);
+ uncompressed.position(uncompressed.position() +
+ (int) uncompressedBytes);
} else if (uncompressed != null) {
+ // mark the uncompressed buffer as done
uncompressed.position(uncompressed.limit());
}
}
+ private void seek(long desired) throws IOException {
+ for(int i = 0; i < bytes.length; ++i) {
+ if (offsets[i] <= desired &&
+ desired - offsets[i] < bytes[i].remaining()) {
+ currentRange = i;
+ compressed = bytes[i].array();
+ offsetInCompressed = (int) (bytes[i].arrayOffset() +
+ bytes[i].position() + (desired - offsets[i]));
+ currentOffset = desired;
+ limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();
+ return;
+ }
+ }
+ // if they are seeking to the precise end, go ahead and let them go there
+ int segments = bytes.length;
+ if (segments != 0 &&
+ desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+ currentRange = segments - 1;
+ compressed = bytes[currentRange].array();
+ offsetInCompressed = bytes[currentRange].arrayOffset() +
+ bytes[currentRange].limit();
+ currentOffset = desired;
+ limitInCompressed = offsetInCompressed;
+ return;
+ }
+ throw new IOException("Seek outside of data in " + this + " to " +
+ desired);
+ }
+
+ private String rangeString() {
+ StringBuilder builder = new StringBuilder();
+ for(int i=0; i < offsets.length; ++i) {
+ if (i != 0) {
+ builder.append("; ");
+ }
+ builder.append(" range " + i + " = " + offsets[i] + " to " +
+ bytes[i].remaining());
+ }
+ return builder.toString();
+ }
+
@Override
public String toString() {
- return "compressed stream " + name + " base: " + base +
- " offset: " + offset + " limit: " + limit +
+ return "compressed stream " + name + " position: " + currentOffset +
+ " length: " + length + " range: " + currentRange +
+ " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
+ rangeString() +
(uncompressed == null ? "" :
" uncompressed: " + uncompressed.position() + " to " +
uncompressed.limit());
@@ -203,14 +304,29 @@ abstract class InStream extends InputStr
public abstract void seek(PositionProvider index) throws IOException;
+ /**
+ * Create an input stream from a list of buffers.
+ * @param name the name of the stream
+ * @param input the list of ranges of bytes for the stream
+ * @param offsets a list of offsets (the same length as input) that must
+ * contain the first offset of the each set of bytes in input
+ * @param length the length in bytes of the stream
+ * @param codec the compression codec
+ * @param bufferSize the compression buffer size
+ * @return an input stream
+ * @throws IOException
+ */
public static InStream create(String name,
- ByteBuffer input,
+ ByteBuffer[] input,
+ long[] offsets,
+ long length,
CompressionCodec codec,
int bufferSize) throws IOException {
if (codec == null) {
- return new UncompressedStream(name, input);
+ return new UncompressedStream(name, input, offsets, length);
} else {
- return new CompressedStream(name, input, codec, bufferSize);
+ return new CompressedStream(name, input, offsets, length, codec,
+ bufferSize);
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Aug 15 19:05:35 2013
@@ -18,13 +18,22 @@
package org.apache.hadoop.hive.ql.io.orc;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -43,6 +52,8 @@ import java.util.List;
public class OrcInputFormat extends FileInputFormat<NullWritable, OrcStruct>
implements InputFormatChecker {
+ private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+
private static class OrcRecordReader
implements RecordReader<NullWritable, OrcStruct> {
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
@@ -53,14 +64,38 @@ public class OrcInputFormat extends Fil
OrcRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
- this.reader = file.rows(offset, length,
- findIncludedColumns(file.getTypes(), conf));
+ String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ String columnNamesString =
+ conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ String[] columnNames = null;
+ SearchArgument sarg = null;
List<OrcProto.Type> types = file.getTypes();
if (types.size() == 0) {
numColumns = 0;
} else {
numColumns = types.get(0).getSubtypesCount();
}
+ columnNames = new String[types.size()];
+ LOG.info("included column ids = " +
+ conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "null"));
+ LOG.info("included columns names = " +
+ conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "null"));
+ boolean[] includeColumn = findIncludedColumns(types, conf);
+ if (serializedPushdown != null && columnNamesString != null) {
+ sarg = SearchArgument.FACTORY.create
+ (Utilities.deserializeExpression(serializedPushdown, conf));
+ LOG.info("ORC pushdown predicate: " + sarg);
+ String[] neededColumnNames = columnNamesString.split(",");
+ int i = 0;
+ for(int columnId: types.get(0).getSubtypesList()) {
+ if (includeColumn[columnId]) {
+ columnNames[columnId] = neededColumnNames[i++];
+ }
+ }
+ } else {
+ LOG.info("No ORC pushdown predicate");
+ }
+ this.reader = file.rows(offset, length,includeColumn, sarg, columnNames);
this.offset = offset;
this.length = length;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Thu Aug 15 19:05:35 2013
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -38,6 +40,9 @@ import java.util.Properties;
* It transparently passes the object to/from the ORC file reader/writer.
*/
public class OrcSerde implements SerDe {
+
+ private static final Log LOG = LogFactory.getLog(OrcSerde.class);
+
private final OrcSerdeRow row = new OrcSerdeRow();
private ObjectInspector inspector = null;
@@ -129,4 +134,5 @@ public class OrcSerde implements SerDe {
public SerDeStats getSerDeStats() {
return null;
}
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Thu Aug 15 19:05:35 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.io.IOException;
@@ -118,8 +119,26 @@ public interface Reader {
* @param include true for each column that should be included
* @return a new RecordReader that will read the specified rows.
* @throws IOException
+ * @deprecated
*/
RecordReader rows(long offset, long length,
boolean[] include) throws IOException;
+ /**
+ * Create a RecordReader that will read a section of a file. It starts reading
+ * at the first stripe after the offset and continues to the stripe that
+ * starts at offset + length. It also accepts a list of columns to read and a
+ * search argument.
+ * @param offset the minimum offset of the first stripe to read
+ * @param length the distance from offset of the first address to stop reading
+ * at
+ * @param include true for each column that should be included
+ * @param sarg a search argument that limits the rows that should be read.
+ * @param neededColumns the names of the included columns
+ * @return the record reader for the rows
+ */
+ RecordReader rows(long offset, long length,
+ boolean[] include, SearchArgument sarg,
+ String[] neededColumns) throws IOException;
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Aug 15 19:05:35 2013
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
@@ -307,7 +308,8 @@ final class ReaderImpl implements Reader
buffer.position(psOffset - footerSize);
buffer.limit(psOffset);
}
- InputStream instream = InStream.create("footer", buffer, codec, bufferSize);
+ InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer},
+ new long[]{0L}, footerSize, codec, bufferSize);
footer = OrcProto.Footer.parseFrom(instream);
inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
file.close();
@@ -315,15 +317,22 @@ final class ReaderImpl implements Reader
@Override
public RecordReader rows(boolean[] include) throws IOException {
- return rows(0, Long.MAX_VALUE, include);
+ return rows(0, Long.MAX_VALUE, include, null, null);
}
@Override
public RecordReader rows(long offset, long length, boolean[] include
) throws IOException {
+ return rows(offset, length, include, null, null);
+ }
+
+ @Override
+ public RecordReader rows(long offset, long length, boolean[] include,
+ SearchArgument sarg, String[] columnNames
+ ) throws IOException {
return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset,
- length, footer.getTypesList(), codec, bufferSize,
- include, footer.getRowIndexStride());
+ length, footer.getTypesList(), codec, bufferSize,
+ include, footer.getRowIndexStride(), sarg, columnNames);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Aug 15 19:05:35 2013
@@ -27,12 +27,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -45,6 +51,9 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
class RecordReaderImpl implements RecordReader {
+
+ private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+
private final FSDataInputStream file;
private final long firstRow;
private final List<StripeInformation> stripes =
@@ -52,17 +61,25 @@ class RecordReaderImpl implements Record
private OrcProto.StripeFooter stripeFooter;
private final long totalRowCount;
private final CompressionCodec codec;
+ private final List<OrcProto.Type> types;
private final int bufferSize;
private final boolean[] included;
private final long rowIndexStride;
private long rowInStripe = 0;
- private int currentStripe = 0;
+ private int currentStripe = -1;
private long rowBaseInStripe = 0;
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
private final TreeReader reader;
private final OrcProto.RowIndex[] indexes;
+ private final SearchArgument sarg;
+ // the leaf predicates for the sarg
+ private final List<PredicateLeaf> sargLeaves;
+ // an array the same length as the sargLeaves that map them to column ids
+ private final int[] filterColumns;
+ // an array about which row groups aren't skipped
+ private boolean[] includedRowGroups = null;
RecordReaderImpl(Iterable<StripeInformation> stripes,
FileSystem fileSystem,
@@ -72,12 +89,27 @@ class RecordReaderImpl implements Record
CompressionCodec codec,
int bufferSize,
boolean[] included,
- long strideRate
+ long strideRate,
+ SearchArgument sarg,
+ String[] columnNames
) throws IOException {
this.file = fileSystem.open(path);
this.codec = codec;
+ this.types = types;
this.bufferSize = bufferSize;
this.included = included;
+ this.sarg = sarg;
+ if (sarg != null) {
+ sargLeaves = sarg.getLeaves();
+ filterColumns = new int[sargLeaves.size()];
+ for(int i=0; i < filterColumns.length; ++i) {
+ String colName = sargLeaves.get(i).getColumnName();
+ filterColumns[i] = findColumns(columnNames, colName);
+ }
+ } else {
+ sargLeaves = null;
+ filterColumns = null;
+ }
long rows = 0;
long skippedRows = 0;
for(StripeInformation stripe: stripes) {
@@ -94,9 +126,17 @@ class RecordReaderImpl implements Record
reader = createTreeReader(path, 0, types, included);
indexes = new OrcProto.RowIndex[types.size()];
rowIndexStride = strideRate;
- if (this.stripes.size() > 0) {
- readStripe();
+ advanceToNextRow(0L);
+ }
+
+ private static int findColumns(String[] columnNames,
+ String columnName) {
+ for(int i=0; i < columnNames.length; ++i) {
+ if (columnName.equals(columnNames[i])) {
+ return i;
+ }
}
+ return -1;
}
private static final class PositionProviderImpl implements PositionProvider {
@@ -1418,113 +1458,707 @@ class RecordReaderImpl implements Record
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.seek(offset);
file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
- codec, bufferSize));
+ return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
+ new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
+ bufferSize));
}
- private void readStripe() throws IOException {
- StripeInformation stripe = stripes.get(currentStripe);
- stripeFooter = readStripeFooter(stripe);
- long offset = stripe.getOffset();
- streams.clear();
+ static enum Location {
+ BEFORE, MIN, MIDDLE, MAX, AFTER
+ }
- // if we aren't projecting columns, just read the whole stripe
- if (included == null) {
- byte[] buffer =
- new byte[(int) (stripe.getDataLength())];
- file.seek(offset + stripe.getIndexLength());
- file.readFully(buffer, 0, buffer.length);
- int sectionOffset = 0;
- for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
- if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
- int sectionLength = (int) section.getLength();
- ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
- sectionLength);
- StreamName name = new StreamName(section.getColumn(),
- section.getKind());
- streams.put(name,
- InStream.create(name.toString(), sectionBuffer, codec,
- bufferSize));
- sectionOffset += sectionLength;
- }
+ /**
+ * Given a point and min and max, determine if the point is before, at the
+ * min, in the middle, at the max, or after the range.
+ * @param point the point to test
+ * @param min the minimum point
+ * @param max the maximum point
+ * @param <T> the type of the comparision
+ * @return the location of the point
+ */
+ static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+ int minCompare = point.compareTo(min);
+ if (minCompare < 0) {
+ return Location.BEFORE;
+ } else if (minCompare == 0) {
+ return Location.MIN;
+ }
+ int maxCompare = point.compareTo(max);
+ if (maxCompare > 0) {
+ return Location.AFTER;
+ } else if (maxCompare == 0) {
+ return Location.MAX;
+ }
+ return Location.MIDDLE;
+ }
+
+ /**
+ * Get the minimum value out of an index entry.
+ * @param index the index entry
+ * @return the object for the minimum value or null if there isn't one
+ */
+ static Object getMin(OrcProto.ColumnStatistics index) {
+ if (index.hasIntStatistics()) {
+ OrcProto.IntegerStatistics stat = index.getIntStatistics();
+ if (stat.hasMinimum()) {
+ return stat.getMinimum();
}
- } else {
- List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
- // the index of the current section
- int currentSection = 0;
- while (currentSection < streamList.size() &&
- StreamName.getArea(streamList.get(currentSection).getKind()) !=
- StreamName.Area.DATA) {
- currentSection += 1;
- }
- // byte position of the current section relative to the stripe start
- long sectionOffset = stripe.getIndexLength();
- while (currentSection < streamList.size()) {
- int bytes = 0;
-
- // find the first section that shouldn't be read
- int excluded=currentSection;
- while (excluded < streamList.size() &&
- included[streamList.get(excluded).getColumn()]) {
- bytes += streamList.get(excluded).getLength();
- excluded += 1;
- }
-
- // actually read the bytes as a big chunk
- if (bytes != 0) {
- byte[] buffer = new byte[bytes];
- file.seek(offset + sectionOffset);
- file.readFully(buffer, 0, bytes);
- sectionOffset += bytes;
-
- // create the streams for the sections we just read
- bytes = 0;
- while (currentSection < excluded) {
- OrcProto.Stream section = streamList.get(currentSection);
- StreamName name =
- new StreamName(section.getColumn(), section.getKind());
- this.streams.put(name,
- InStream.create(name.toString(),
- ByteBuffer.wrap(buffer, bytes,
- (int) section.getLength()), codec, bufferSize));
- currentSection += 1;
- bytes += section.getLength();
+ }
+ if (index.hasStringStatistics()) {
+ OrcProto.StringStatistics stat = index.getStringStatistics();
+ if (stat.hasMinimum()) {
+ return stat.getMinimum();
+ }
+ }
+ if (index.hasDoubleStatistics()) {
+ OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+ if (stat.hasMinimum()) {
+ return stat.getMinimum();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get the maximum value out of an index entry.
+ * @param index the index entry
+ * @return the object for the maximum value or null if there isn't one
+ */
+ static Object getMax(OrcProto.ColumnStatistics index) {
+ if (index.hasIntStatistics()) {
+ OrcProto.IntegerStatistics stat = index.getIntStatistics();
+ if (stat.hasMaximum()) {
+ return stat.getMaximum();
+ }
+ }
+ if (index.hasStringStatistics()) {
+ OrcProto.StringStatistics stat = index.getStringStatistics();
+ if (stat.hasMaximum()) {
+ return stat.getMaximum();
+ }
+ }
+ if (index.hasDoubleStatistics()) {
+ OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+ if (stat.hasMaximum()) {
+ return stat.getMaximum();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
+ * that is referenced in the predicate.
+ * @param index the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @return the set of truth values that may be returned for the given
+ * predicate.
+ */
+ static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index,
+ PredicateLeaf predicate) {
+ Object minValue = getMin(index);
+ // if we didn't have any values, everything must have been null
+ if (minValue == null) {
+ if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+ return TruthValue.YES;
+ } else {
+ return TruthValue.NULL;
+ }
+ }
+ Object maxValue = getMax(index);
+ Location loc;
+ switch (predicate.getOperator()) {
+ case NULL_SAFE_EQUALS:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return TruthValue.NO;
+ } else {
+ return TruthValue.YES_NO;
+ }
+ case EQUALS:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (minValue.equals(maxValue) && loc == Location.MIN) {
+ return TruthValue.YES_NULL;
+ } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ case LESS_THAN:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (loc == Location.AFTER) {
+ return TruthValue.YES_NULL;
+ } else if (loc == Location.BEFORE || loc == Location.MIN) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ case LESS_THAN_EQUALS:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (loc == Location.AFTER || loc == Location.MAX) {
+ return TruthValue.YES_NULL;
+ } else if (loc == Location.BEFORE) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ case IN:
+ if (minValue.equals(maxValue)) {
+ // for a single value, look through to see if that value is in the
+ // set
+ for(Object arg: predicate.getLiteralList()) {
+ loc = compareToRange((Comparable) arg, minValue, maxValue);
+ if (loc == Location.MIN) {
+ return TruthValue.YES_NULL;
+ }
+ }
+ return TruthValue.NO_NULL;
+ } else {
+ // are all of the values outside of the range?
+ for(Object arg: predicate.getLiteralList()) {
+ loc = compareToRange((Comparable) arg, minValue, maxValue);
+ if (loc == Location.MIN || loc == Location.MIDDLE ||
+ loc == Location.MAX) {
+ return TruthValue.YES_NO_NULL;
+ }
}
+ return TruthValue.NO_NULL;
}
-
- // skip forward until we get back to a section that we need
- while (currentSection < streamList.size() &&
- !included[streamList.get(currentSection).getColumn()]) {
- sectionOffset += streamList.get(currentSection).getLength();
- currentSection += 1;
+ case BETWEEN:
+ List<Object> args = predicate.getLiteralList();
+ loc = compareToRange((Comparable) args.get(0), minValue, maxValue);
+ if (loc == Location.BEFORE || loc == Location.MIN) {
+ Location loc2 = compareToRange((Comparable) args.get(1), minValue,
+ maxValue);
+ if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+ return TruthValue.YES_NULL;
+ } else if (loc2 == Location.BEFORE) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ } else if (loc == Location.AFTER) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
}
+ case IS_NULL:
+ return TruthValue.YES_NO;
+ default:
+ return TruthValue.YES_NO_NULL;
+ }
+ }
+
+ /**
+ * Pick the row groups that we need to load from the current stripe.
+ * @return an array with a boolean for each row group or null if all of the
+ * row groups must be read.
+ * @throws IOException
+ */
+ private boolean[] pickRowGroups() throws IOException {
+ // if we don't have a sarg or indexes, we read everything
+ if (sarg == null || rowIndexStride == 0) {
+ return null;
+ }
+ readRowIndex();
+ long rowsInStripe = stripes.get(currentStripe).getNumberOfRows();
+ int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
+ rowIndexStride);
+ boolean[] result = new boolean[groupsInStripe];
+ TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+ for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
+ for(int pred=0; pred < leafValues.length; ++pred) {
+ OrcProto.ColumnStatistics stats =
+ indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
+ leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stats = " + stats);
+ LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
+ leafValues[pred]);
+ }
+ }
+ result[rowGroup] = sarg.evaluate(leafValues).isNotNeeded();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+ (rowIndexStride * (rowGroup+1) - 1) + " is " +
+ (result[rowGroup] ? "" : "not ") + "included.");
}
}
- reader.startStripe(streams, stripeFooter.getColumnsList());
- rowInStripe = 0;
+
+ // if we found something to skip, use the array. otherwise, return null.
+ for(boolean b: result) {
+ if (!b) {
+ return result;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Read the current stripe into memory.
+ * @throws IOException
+ */
+ private void readStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ streams.clear();
+ // setup the position in the stripe
rowCountInStripe = stripe.getNumberOfRows();
+ rowInStripe = 0;
rowBaseInStripe = 0;
for(int i=0; i < currentStripe; ++i) {
rowBaseInStripe += stripes.get(i).getNumberOfRows();
}
+ // reset all of the indexes
for(int i=0; i < indexes.length; ++i) {
indexes[i] = null;
}
+ includedRowGroups = pickRowGroups();
+
+ // move forward to the first unskipped row
+ if (includedRowGroups != null) {
+ while (rowInStripe < rowCountInStripe &&
+ !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+ rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+ }
+ }
+
+ // if we haven't skipped the whole stripe, read the data
+ if (rowInStripe < rowCountInStripe) {
+ // if we aren't projecting columns or filtering rows, just read it all
+ if (included == null && includedRowGroups == null) {
+ readAllDataStreams(stripe);
+ } else {
+ readPartialDataStreams(stripe);
+ }
+ reader.startStripe(streams, stripeFooter.getColumnsList());
+ // if we skipped the first row group, move the pointers forward
+ if (rowInStripe != 0) {
+ seekToRowEntry((int) (rowInStripe / rowIndexStride));
+ }
+ }
+ }
+
+ private void readAllDataStreams(StripeInformation stripe
+ ) throws IOException {
+ byte[] buffer =
+ new byte[(int) (stripe.getDataLength())];
+ file.seek(stripe.getOffset() + stripe.getIndexLength());
+ file.readFully(buffer, 0, buffer.length);
+ int sectionOffset = 0;
+ for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+ if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+ int sectionLength = (int) section.getLength();
+ ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+ sectionLength);
+ StreamName name = new StreamName(section.getColumn(),
+ section.getKind());
+ streams.put(name,
+ InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer},
+ new long[]{0}, sectionLength, codec, bufferSize));
+ sectionOffset += sectionLength;
+ }
+ }
+ }
+
+ /**
+ * The secionts of stripe that we need to read.
+ */
+ static class DiskRange {
+ /** the first address we need to read. */
+ long offset;
+ /** the first address afterwards. */
+ long end;
+
+ DiskRange(long offset, long end) {
+ this.offset = offset;
+ this.end = end;
+ if (end < offset) {
+ throw new IllegalArgumentException("invalid range " + this);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || other.getClass() != getClass()) {
+ return false;
+ }
+ DiskRange otherR = (DiskRange) other;
+ return otherR.offset == offset && otherR.end == end;
+ }
+
+ @Override
+ public String toString() {
+ return "range start: " + offset + " end: " + end;
+ }
+ }
+
+ private static final int BYTE_STREAM_POSITIONS = 1;
+ private static final int RUN_LENGTH_BYTE_POSITIONS =
+ BYTE_STREAM_POSITIONS + 1;
+ private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+ private static final int RUN_LENGTH_INT_POSITIONS =
+ BYTE_STREAM_POSITIONS + 1;
+
+ /**
+ * Get the offset in the index positions for the column that the given
+ * stream starts.
+ * @param encoding the encoding of the column
+ * @param type the type of the column
+ * @param stream the kind of the stream
+ * @param isCompressed is the file compressed
+ * @param hasNulls does the column have a PRESENT stream?
+ * @return the number of positions that will be used for that stream
+ */
+ static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding,
+ OrcProto.Type.Kind type,
+ OrcProto.Stream.Kind stream,
+ boolean isCompressed,
+ boolean hasNulls) {
+ if (stream == OrcProto.Stream.Kind.PRESENT) {
+ return 0;
+ }
+ int compressionValue = isCompressed ? 1 : 0;
+ int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+ switch (type) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case STRUCT:
+ case MAP:
+ case LIST:
+ case UNION:
+ return base;
+ case STRING:
+ if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+ encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+ return base;
+ } else {
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ } else {
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ }
+ }
+ case BINARY:
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case DECIMAL:
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case TIMESTAMP:
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+ default:
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
+ }
+
+ // for uncompressed streams, what is the most overlap with the following set
+ // of rows (long vint literal group).
+ static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+ /**
+ * Is this stream part of a dictionary?
+ * @return is this part of a dictionary?
+ */
+ static boolean isDictionary(OrcProto.Stream.Kind kind,
+ OrcProto.ColumnEncoding encoding) {
+ OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+ return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+ (kind == OrcProto.Stream.Kind.LENGTH &&
+ (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+ encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+ }
+
+ /**
+ * Plan the ranges of the file that we need to read given the list of
+ * columns and row groups.
+ * @param streamList the list of streams avaiable
+ * @param indexes the indexes that have been loaded
+ * @param includedColumns which columns are needed
+ * @param includedRowGroups which row groups are needed
+ * @param isCompressed does the file have generic compression
+ * @param encodings the encodings for each column
+ * @param types the types of the columns
+ * @param compressionSize the compression block size
+ * @return the list of disk ranges that will be loaded
+ */
+ static List<DiskRange> planReadPartialDataStreams
+ (List<OrcProto.Stream> streamList,
+ OrcProto.RowIndex[] indexes,
+ boolean[] includedColumns,
+ boolean[] includedRowGroups,
+ boolean isCompressed,
+ List<OrcProto.ColumnEncoding> encodings,
+ List<OrcProto.Type> types,
+ int compressionSize) {
+ List<DiskRange> result = new ArrayList<DiskRange>();
+ long offset = 0;
+ // figure out which columns have a present stream
+ boolean[] hasNull = new boolean[types.size()];
+ for(OrcProto.Stream stream: streamList) {
+ if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
+ hasNull[stream.getColumn()] = true;
+ }
+ }
+ for(OrcProto.Stream stream: streamList) {
+ long length = stream.getLength();
+ int column = stream.getColumn();
+ OrcProto.Stream.Kind streamKind = stream.getKind();
+ if (StreamName.getArea(streamKind) == StreamName.Area.DATA &&
+ includedColumns[column]) {
+ // if we aren't filtering or it is a dictionary, load it.
+ if (includedRowGroups == null ||
+ isDictionary(streamKind, encodings.get(column))) {
+ result.add(new DiskRange(offset, offset + length));
+ } else {
+ for(int group=0; group < includedRowGroups.length; ++group) {
+ if (includedRowGroups[group]) {
+ int posn = getIndexPosition(encodings.get(column).getKind(),
+ types.get(column).getKind(), stream.getKind(), isCompressed,
+ hasNull[column]);
+ long start = indexes[column].getEntry(group).getPositions(posn);
+ // figure out the worst case last location
+ long end = (group == includedRowGroups.length - 1) ?
+ length : Math.min(length,
+ indexes[column].getEntry(group + 1)
+ .getPositions(posn)
+ + (isCompressed ?
+ (OutStream.HEADER_SIZE
+ + compressionSize) :
+ WORST_UNCOMPRESSED_SLOP));
+ result.add(new DiskRange(offset + start, offset + end));
+ }
+ }
+ }
+ }
+ offset += length;
+ }
+ return result;
+ }
+
+ /**
+ * Update the disk ranges to collapse adjacent or overlapping ranges. It
+ * assumes that the ranges are sorted.
+ * @param ranges the list of disk ranges to merge
+ */
+ static void mergeDiskRanges(List<DiskRange> ranges) {
+ DiskRange prev = null;
+ for(int i=0; i < ranges.size(); ++i) {
+ DiskRange current = ranges.get(i);
+ if (prev != null && overlap(prev.offset, prev.end,
+ current.offset, current.end)) {
+ prev.offset = Math.min(prev.offset, current.offset);
+ prev.end = Math.max(prev.end, current.end);
+ ranges.remove(i);
+ i -= 1;
+ } else {
+ prev = current;
+ }
+ }
+ }
+
+ /**
+ * Read the list of ranges from the file.
+ * @param file the file to read
+ * @param base the base of the stripe
+ * @param ranges the disk ranges within the stripe to read
+ * @return the bytes read for each disk range, which is the same length as
+ * ranges
+ * @throws IOException
+ */
+ static byte[][] readDiskRanges(FSDataInputStream file,
+ long base,
+ List<DiskRange> ranges) throws IOException {
+ byte[][] result = new byte[ranges.size()][];
+ int i = 0;
+ for(DiskRange range: ranges) {
+ int len = (int) (range.end - range.offset);
+ result[i] = new byte[len];
+ file.seek(base + range.offset);
+ file.readFully(result[i]);
+ i += 1;
+ }
+ return result;
+ }
+
+ /**
+ * Does region A overlap region B? The end points are inclusive on both sides.
+ * @param leftA A's left point
+ * @param rightA A's right point
+ * @param leftB B's left point
+ * @param rightB B's right point
+ * @return Does region A overlap region B?
+ */
+ static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+ if (leftA <= leftB) {
+ return rightA >= leftB;
+ }
+ return rightB >= leftA;
+ }
+
+ /**
+ * Build a string representation of a list of disk ranges.
+ * @param ranges ranges to stringify
+ * @return the resulting string
+ */
+ static String stringifyDiskRanges(List<DiskRange> ranges) {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("[");
+ for(int i=0; i < ranges.size(); ++i) {
+ if (i != 0) {
+ buffer.append(", ");
+ }
+ buffer.append(ranges.get(i).toString());
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ static void createStreams(List<OrcProto.Stream> streamDescriptions,
+ List<DiskRange> ranges,
+ byte[][] bytes,
+ boolean[] includeColumn,
+ CompressionCodec codec,
+ int bufferSize,
+ Map<StreamName, InStream> streams
+ ) throws IOException {
+ long offset = 0;
+ for(OrcProto.Stream streamDesc: streamDescriptions) {
+ int column = streamDesc.getColumn();
+ if (includeColumn[column] &&
+ StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
+ long length = streamDesc.getLength();
+ int first = -1;
+ int last = -2;
+ for(int i=0; i < bytes.length; ++i) {
+ DiskRange range = ranges.get(i);
+ if (overlap(offset, offset+length, range.offset, range.end)) {
+ if (first == -1) {
+ first = i;
+ }
+ last = i;
+ }
+ }
+ ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
+ long[] offsets = new long[last - first + 1];
+ for(int i=0; i < buffers.length; ++i) {
+ DiskRange range = ranges.get(i + first);
+ long start = Math.max(range.offset, offset);
+ long end = Math.min(range.end, offset+length);
+ buffers[i] = ByteBuffer.wrap(bytes[first + i],
+ Math.max(0, (int) (offset - range.offset)), (int) (end - start));
+ offsets[i] = Math.max(0, range.offset - offset);
+ }
+ StreamName name = new StreamName(column, streamDesc.getKind());
+ streams.put(name, InStream.create(name.toString(), buffers, offsets,
+ length, codec, bufferSize));
+ }
+ offset += streamDesc.getLength();
+ }
+ }
+
+ private void readPartialDataStreams(StripeInformation stripe
+ ) throws IOException {
+ List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ List<DiskRange> chunks =
+ planReadPartialDataStreams(streamList,
+ indexes, included, includedRowGroups, codec != null,
+ stripeFooter.getColumnsList(), types, bufferSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("chunks = " + stringifyDiskRanges(chunks));
+ }
+ mergeDiskRanges(chunks);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("merge = " + stringifyDiskRanges(chunks));
+ }
+ byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks);
+ createStreams(streamList, chunks, bytes, included, codec, bufferSize,
+ streams);
}
@Override
public boolean hasNext() throws IOException {
- return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+ return rowInStripe < rowCountInStripe;
}
- @Override
- public Object next(Object previous) throws IOException {
- if (rowInStripe >= rowCountInStripe) {
+ /**
+ * Read the next stripe until we find a row that we don't skip.
+ * @throws IOException
+ */
+ private void advanceStripe() throws IOException {
+ rowInStripe = rowCountInStripe;
+ while (rowInStripe >= rowCountInStripe &&
+ currentStripe < stripes.size() - 1) {
currentStripe += 1;
readStripe();
}
+ }
+
+ /**
+ * Skip over rows that we aren't selecting, so that the next row is
+ * one that we will read.
+ * @param nextRow the row we want to go to
+ * @throws IOException
+ */
+ private void advanceToNextRow(long nextRow) throws IOException {
+ long nextRowInStripe = nextRow - rowBaseInStripe;
+ // check for row skipping
+ if (rowIndexStride != 0 &&
+ includedRowGroups != null &&
+ nextRowInStripe < rowCountInStripe) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ if (!includedRowGroups[rowGroup]) {
+ while (rowGroup < includedRowGroups.length &&
+ !includedRowGroups[rowGroup]) {
+ rowGroup += 1;
+ }
+ // if we are off the end of the stripe, just move stripes
+ if (rowGroup >= includedRowGroups.length) {
+ advanceStripe();
+ return;
+ }
+ nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+ }
+ }
+ if (nextRowInStripe < rowCountInStripe) {
+ if (nextRowInStripe != rowInStripe) {
+ if (rowIndexStride != 0) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ seekToRowEntry(rowGroup);
+ reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+ } else {
+ reader.skipRows(nextRowInStripe - rowInStripe);
+ }
+ rowInStripe = nextRowInStripe;
+ }
+ } else {
+ advanceStripe();
+ }
+ }
+
+ @Override
+ public Object next(Object previous) throws IOException {
+ Object result = reader.next(previous);
+ // find the next row
rowInStripe += 1;
- return reader.next(previous);
+ advanceToNextRow(rowInStripe + rowBaseInStripe);
+ return result;
}
@Override
@@ -1548,14 +2182,6 @@ class RecordReaderImpl implements Record
}
private int findStripe(long rowNumber) {
- if (rowNumber < 0) {
- throw new IllegalArgumentException("Seek to a negative row number " +
- rowNumber);
- } else if (rowNumber < firstRow) {
- throw new IllegalArgumentException("Seek before reader range " +
- rowNumber);
- }
- rowNumber -= firstRow;
for(int i=0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
if (stripe.getNumberOfRows() > rowNumber) {
@@ -1576,7 +2202,8 @@ class RecordReaderImpl implements Record
file.seek(offset);
file.readFully(buffer);
indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
- ByteBuffer.wrap(buffer), codec, bufferSize));
+ new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0},
+ stream.getLength(), codec, bufferSize));
}
}
offset += stream.getLength();
@@ -1596,19 +2223,25 @@ class RecordReaderImpl implements Record
@Override
public void seekToRow(long rowNumber) throws IOException {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number " +
+ rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
+ rowNumber);
+ }
+ // convert to our internal form (rows from the beginning of slice)
+ rowNumber -= firstRow;
+
+ // move to the right stripe
int rightStripe = findStripe(rowNumber);
if (rightStripe != currentStripe) {
currentStripe = rightStripe;
readStripe();
}
readRowIndex();
- rowInStripe = rowNumber - rowBaseInStripe - firstRow;
- if (rowIndexStride != 0) {
- long entry = rowInStripe / rowIndexStride;
- seekToRowEntry((int) entry);
- reader.skipRows(rowInStripe - entry * rowIndexStride);
- } else {
- reader.skipRows(rowInStripe);
- }
+
+ // if we aren't to the right row yet, advanance in the stripe.
+ advanceToNextRow(rowNumber);
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Thu Aug 15 19:05:35 2013
@@ -57,7 +57,7 @@ class RunLengthByteReader {
while (bytes < numLiterals) {
int result = input.read(literals, bytes, numLiterals - bytes);
if (result == -1) {
- throw new EOFException("Reading RLE byte literal got EOF");
+ throw new EOFException("Reading RLE byte literal got EOF in " + this);
}
bytes += result;
}
@@ -108,4 +108,10 @@ class RunLengthByteReader {
items -= consume;
}
}
+
+ @Override
+ public String toString() {
+ return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+ used + "/" + numLiterals + " from " + input;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java Thu Aug 15 19:05:35 2013
@@ -134,6 +134,21 @@ public interface SearchArgument {
throw new IllegalArgumentException("Unknown value: " + this);
}
}
+
+ /**
+ * Does the RecordReader need to include this set of records?
+ * @return true unless none of the rows qualify
+ */
+ public boolean isNotNeeded() {
+ switch (this) {
+ case NO:
+ case NULL:
+ case NO_NULL:
+ return false;
+ default:
+ return true;
+ }
+ }
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Thu Aug 15 19:05:35 2013
@@ -36,16 +36,12 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
@@ -57,7 +53,7 @@ import java.util.Map;
*/
final class SearchArgumentImpl implements SearchArgument {
- private static final class PredicateLeafImpl implements PredicateLeaf {
+ static final class PredicateLeafImpl implements PredicateLeaf {
private final Operator operator;
private final Type type;
private final String columnName;
@@ -270,7 +266,6 @@ final class SearchArgumentImpl implement
}
static class ExpressionBuilder {
- private ExpressionTree expression = null;
private final List<PredicateLeaf> leaves = new ArrayList<PredicateLeaf>();
/**
@@ -321,11 +316,11 @@ final class SearchArgumentImpl implement
private static Object boxLiteral(ExprNodeConstantDesc lit) {
switch (getType(lit)) {
case INTEGER:
- return new LongWritable(((Number) lit.getValue()).longValue());
+ return ((Number) lit.getValue()).longValue();
case STRING:
- return new Text(lit.getValue().toString());
+ return lit.getValue().toString();
case FLOAT:
- return new DoubleWritable(((Number) lit.getValue()).doubleValue());
+ return ((Number) lit.getValue()).doubleValue();
default:
throw new IllegalArgumentException("Unknown literal " + getType(lit));
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Aug 15 19:05:35 2013
@@ -309,6 +309,7 @@ public final class ColumnPrunerProcFacto
cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
cols);
ArrayList<Integer> needed_columns = new ArrayList<Integer>();
+ List<String> neededColumnNames = new ArrayList<String>();
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
TableScanDesc desc = scanOp.getConf();
List<VirtualColumn> virtualCols = desc.getVirtualCols();
@@ -339,12 +340,15 @@ public final class ColumnPrunerProcFacto
}
int position = inputRR.getPosition(cols.get(i));
if (position >=0) {
+ // get the needed columns by id and name
needed_columns.add(position);
+ neededColumnNames.add(cols.get(i));
}
}
desc.setVirtualCols(newVirtualCols);
scanOp.setNeededColumnIDs(needed_columns);
+ scanOp.setNeededColumns(neededColumnNames);
return null;
}
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java Thu Aug 15 19:05:35 2013
@@ -47,7 +47,8 @@ public class TestBitFieldReader {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf,
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
codec, 500), 1);
for(int i=0; i < COUNT; ++i) {
int x = in.next();
@@ -96,7 +97,8 @@ public class TestBitFieldReader {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf,
+ BitFieldReader in = new BitFieldReader(InStream.create("test",
+ new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
null, 500), 3);
for(int i=0; i < COUNT; ++i) {
int x = in.next();
@@ -126,7 +128,8 @@ public class TestBitFieldReader {
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
BitFieldReader in = new BitFieldReader(InStream.create
- ("test", inBuf, null, 100), 1);
+ ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
+ null, 100), 1);
for(int i=0; i < COUNT; i += 5) {
int x = (int) in.next();
if (i < COUNT/2) {
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java Thu Aug 15 19:05:35 2013
@@ -84,7 +84,11 @@ public class TestBitPack {
inBuf.flip();
long[] buff = new long[SIZE];
SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
- InStream.create("test", inBuf, null, SIZE));
+ InStream.create("test",
+ new ByteBuffer[]{inBuf},
+ new long[]{0},
+ inBuf.remaining(),
+ null, SIZE));
for(int i = 0; i < SIZE; i++) {
buff[i] = SerializationUtils.zigzagDecode(buff[i]);
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java Thu Aug 15 19:05:35 2013
@@ -20,6 +20,9 @@ package org.apache.hadoop.hive.ql.io.orc
import org.junit.Test;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -40,7 +43,8 @@ public class TestInStream {
}
}
- static class PositionCollector implements PositionProvider, PositionRecorder {
+ static class PositionCollector
+ implements PositionProvider, PositionRecorder {
private List<Long> positions = new ArrayList<Long>();
private int index = 0;
@@ -53,6 +57,22 @@ public class TestInStream {
public void addPosition(long offset) {
positions.add(offset);
}
+
+ public void reset() {
+ index = 0;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder("position: ");
+ for(int i=0; i < positions.size(); ++i) {
+ if (i != 0) {
+ builder.append(", ");
+ }
+ builder.append(positions.get(i));
+ }
+ return builder.toString();
+ }
}
@Test
@@ -73,9 +93,11 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", inBuf, null, 100);
- assertEquals("uncompressed stream test base: 0 offset: 0 limit: 1024",
- in.toString());
+ InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), null, 100);
+ assertEquals("uncompressed stream test position: 0 length: 1024" +
+ " range: 0 offset: 0 limit: 0",
+ in.toString());
for(int i=0; i < 1024; ++i) {
int x = in.read();
assertEquals(i & 0xff, x);
@@ -103,9 +125,11 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", inBuf, codec, 300);
- assertEquals("compressed stream test base: 0 offset: 0 limit: 961",
- in.toString());
+ InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), codec, 300);
+ assertEquals("compressed stream test position: 0 length: 961 range: 0" +
+ " offset: 0 limit: 0 range 0 = 0 to 961",
+ in.toString());
for(int i=0; i < 1024; ++i) {
int x = in.read();
assertEquals(i & 0xff, x);
@@ -134,7 +158,8 @@ public class TestInStream {
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- InStream in = InStream.create("test", inBuf, codec, 100);
+ InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(), codec, 100);
byte[] contents = new byte[1024];
try {
in.read(contents);
@@ -148,7 +173,8 @@ public class TestInStream {
inBuf.put((byte) 32);
inBuf.put((byte) 0);
inBuf.flip();
- in = InStream.create("test2", inBuf, codec, 300);
+ in = InStream.create("test2", new ByteBuffer[]{inBuf}, new long[]{0},
+ inBuf.remaining(), codec, 300);
try {
in.read();
fail();
@@ -156,4 +182,132 @@ public class TestInStream {
// EXPECTED
}
}
+
+ @Test
+ public void testDisjointBuffers() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ CompressionCodec codec = new ZlibCodec();
+ OutStream out = new OutStream("test", 400, codec, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ DataOutput stream = new DataOutputStream(out);
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ stream.writeInt(i);
+ }
+ out.flush();
+ assertEquals("test", out.toString());
+ assertEquals(1674, collect.buffer.size());
+ ByteBuffer[] inBuf = new ByteBuffer[3];
+ inBuf[0] = ByteBuffer.allocate(500);
+ inBuf[1] = ByteBuffer.allocate(1200);
+ inBuf[2] = ByteBuffer.allocate(500);
+ collect.buffer.setByteBuffer(inBuf[0], 0, 483);
+ collect.buffer.setByteBuffer(inBuf[1], 483, 1625 - 483);
+ collect.buffer.setByteBuffer(inBuf[2], 1625, 1674 - 1625);
+
+ for(int i=0; i < inBuf.length; ++i) {
+ inBuf[i].flip();
+ }
+ InStream in = InStream.create("test", inBuf,
+ new long[]{0,483, 1625}, 1674, codec, 400);
+ assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
+ " offset: 0 limit: 0 range 0 = 0 to 483;" +
+ " range 1 = 483 to 1142; range 2 = 1625 to 49",
+ in.toString());
+ DataInputStream inStream = new DataInputStream(in);
+ for(int i=0; i < 1024; ++i) {
+ int x = inStream.readInt();
+ assertEquals(i, x);
+ }
+ assertEquals(0, in.available());
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
+ new long[]{483, 1625}, 1674, codec, 400);
+ inStream = new DataInputStream(in);
+ positions[303].reset();
+ in.seek(positions[303]);
+ for(int i=303; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
+ new long[]{0, 1625}, 1674, codec, 400);
+ inStream = new DataInputStream(in);
+ positions[1001].reset();
+ for(int i=0; i < 300; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ in.seek(positions[1001]);
+ for(int i=1001; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ }
+
+ @Test
+ public void testUncompressedDisjointBuffers() throws Exception {
+ OutputCollector collect = new OutputCollector();
+ OutStream out = new OutStream("test", 400, null, collect);
+ PositionCollector[] positions = new PositionCollector[1024];
+ DataOutput stream = new DataOutputStream(out);
+ for(int i=0; i < 1024; ++i) {
+ positions[i] = new PositionCollector();
+ out.getPosition(positions[i]);
+ stream.writeInt(i);
+ }
+ out.flush();
+ assertEquals("test", out.toString());
+ assertEquals(4096, collect.buffer.size());
+ ByteBuffer[] inBuf = new ByteBuffer[3];
+ inBuf[0] = ByteBuffer.allocate(1100);
+ inBuf[1] = ByteBuffer.allocate(2200);
+ inBuf[2] = ByteBuffer.allocate(1100);
+ collect.buffer.setByteBuffer(inBuf[0], 0, 1024);
+ collect.buffer.setByteBuffer(inBuf[1], 1024, 2048);
+ collect.buffer.setByteBuffer(inBuf[2], 3072, 1024);
+
+ for(int i=0; i < inBuf.length; ++i) {
+ inBuf[i].flip();
+ }
+ InStream in = InStream.create("test", inBuf,
+ new long[]{0, 1024, 3072}, 4096, null, 400);
+ assertEquals("uncompressed stream test position: 0 length: 4096" +
+ " range: 0 offset: 0 limit: 0",
+ in.toString());
+ DataInputStream inStream = new DataInputStream(in);
+ for(int i=0; i < 1024; ++i) {
+ int x = inStream.readInt();
+ assertEquals(i, x);
+ }
+ assertEquals(0, in.available());
+ for(int i=1023; i >= 0; --i) {
+ in.seek(positions[i]);
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
+ new long[]{1024, 3072}, 4096, null, 400);
+ inStream = new DataInputStream(in);
+ positions[256].reset();
+ in.seek(positions[256]);
+ for(int i=256; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+
+ in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
+ new long[]{0, 3072}, 4096, null, 400);
+ inStream = new DataInputStream(in);
+ positions[768].reset();
+ for(int i=0; i < 256; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ in.seek(positions[768]);
+ for(int i=768; i < 1024; ++i) {
+ assertEquals(i, inStream.readInt());
+ }
+ }
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java Thu Aug 15 19:05:35 2013
@@ -53,8 +53,11 @@ public class TestIntegerCompressionReade
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
- ("test", inBuf, codec, 1000), true);
+ RunLengthIntegerReaderV2 in =
+ new RunLengthIntegerReaderV2(InStream.create
+ ("test", new ByteBuffer[]{inBuf},
+ new long[]{0}, inBuf.remaining(),
+ codec, 1000), true);
for(int i=0; i < 2048; ++i) {
int x = (int) in.next();
if (i < 1024) {
@@ -104,8 +107,12 @@ public class TestIntegerCompressionReade
ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
inBuf.flip();
- RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
- ("test", inBuf, null, 100), true);
+ RunLengthIntegerReaderV2 in =
+ new RunLengthIntegerReaderV2(InStream.create("test",
+ new ByteBuffer[]{inBuf},
+ new long[]{0},
+ inBuf.remaining(),
+ null, 100), true);
for(int i=0; i < 2048; i += 10) {
int x = (int) in.next();
if (i < 1024) {