You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/26 23:42:21 UTC
svn commit: r1517707 [7/17] - in /hive/branches/tez: ./
beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/
contrib/src/java/org/apache/hadoop/hive/co...
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Mon Aug 26 21:42:12 2013
@@ -27,10 +27,18 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -43,6 +51,9 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
class RecordReaderImpl implements RecordReader {
+
+ private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+
private final FSDataInputStream file;
private final long firstRow;
private final List<StripeInformation> stripes =
@@ -50,17 +61,25 @@ class RecordReaderImpl implements Record
private OrcProto.StripeFooter stripeFooter;
private final long totalRowCount;
private final CompressionCodec codec;
+ private final List<OrcProto.Type> types;
private final int bufferSize;
private final boolean[] included;
private final long rowIndexStride;
private long rowInStripe = 0;
- private int currentStripe = 0;
+ private int currentStripe = -1;
private long rowBaseInStripe = 0;
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
private final TreeReader reader;
private final OrcProto.RowIndex[] indexes;
+ private final SearchArgument sarg;
+ // the leaf predicates for the sarg
+ private final List<PredicateLeaf> sargLeaves;
+ // an array the same length as the sargLeaves that map them to column ids
+ private final int[] filterColumns;
+ // an array about which row groups aren't skipped
+ private boolean[] includedRowGroups = null;
RecordReaderImpl(Iterable<StripeInformation> stripes,
FileSystem fileSystem,
@@ -70,12 +89,27 @@ class RecordReaderImpl implements Record
CompressionCodec codec,
int bufferSize,
boolean[] included,
- long strideRate
+ long strideRate,
+ SearchArgument sarg,
+ String[] columnNames
) throws IOException {
this.file = fileSystem.open(path);
this.codec = codec;
+ this.types = types;
this.bufferSize = bufferSize;
this.included = included;
+ this.sarg = sarg;
+ if (sarg != null) {
+ sargLeaves = sarg.getLeaves();
+ filterColumns = new int[sargLeaves.size()];
+ for(int i=0; i < filterColumns.length; ++i) {
+ String colName = sargLeaves.get(i).getColumnName();
+ filterColumns[i] = findColumns(columnNames, colName);
+ }
+ } else {
+ sargLeaves = null;
+ filterColumns = null;
+ }
long rows = 0;
long skippedRows = 0;
for(StripeInformation stripe: stripes) {
@@ -92,9 +126,17 @@ class RecordReaderImpl implements Record
reader = createTreeReader(path, 0, types, included);
indexes = new OrcProto.RowIndex[types.size()];
rowIndexStride = strideRate;
- if (this.stripes.size() > 0) {
- readStripe();
+ advanceToNextRow(0L);
+ }
+
+ private static int findColumns(String[] columnNames,
+ String columnName) {
+ for(int i=0; i < columnNames.length; ++i) {
+ if (columnName.equals(columnNames[i])) {
+ return i;
+ }
}
+ return -1;
}
private static final class PositionProviderImpl implements PositionProvider {
@@ -129,6 +171,21 @@ class RecordReaderImpl implements Record
}
}
+ IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
+ InStream in,
+ boolean signed) throws IOException {
+ switch (kind) {
+ case DIRECT_V2:
+ case DICTIONARY_V2:
+ return new RunLengthIntegerReaderV2(in, signed);
+ case DIRECT:
+ case DICTIONARY:
+ return new RunLengthIntegerReader(in, signed);
+ default:
+ throw new IllegalArgumentException("Unknown encoding " + kind);
+ }
+ }
+
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encoding
) throws IOException {
@@ -265,20 +322,29 @@ class RecordReaderImpl implements Record
}
private static class ShortTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -309,20 +375,29 @@ class RecordReaderImpl implements Record
}
private static class IntTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -353,20 +428,29 @@ class RecordReaderImpl implements Record
}
private static class LongTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -491,13 +575,22 @@ class RecordReaderImpl implements Record
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -505,9 +598,8 @@ class RecordReaderImpl implements Record
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
- lengths = new RunLengthIntegerReader(streams.get(new
- StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
- false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(), streams.get(new
+ StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false);
}
@Override
@@ -554,22 +646,33 @@ class RecordReaderImpl implements Record
}
private static class TimestampTreeReader extends TreeReader{
- private RunLengthIntegerReader data;
- private RunLengthIntegerReader nanos;
+ private IntegerReader data = null;
+ private IntegerReader nanos = null;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
- data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.DATA)), true);
- nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.SECONDARY)), false);
+ data = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA)), true);
+ nanos = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.SECONDARY)), false);
}
@Override
@@ -624,20 +727,29 @@ class RecordReaderImpl implements Record
}
private static class DateTreeReader extends TreeReader{
- private RunLengthIntegerReader reader = null;
+ private IntegerReader reader = null;
DateTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), true);
+ reader = createIntegerReader(encodings.get(columnId).getKind(), streams.get(name), true);
}
@Override
@@ -669,20 +781,29 @@ class RecordReaderImpl implements Record
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
- private RunLengthIntegerReader scaleStream;
+ private IntegerReader scaleStream = null;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
- scaleStream = new RunLengthIntegerReader(streams.get(
+ scaleStream = createIntegerReader(encodings.get(columnId).getKind(), streams.get(
new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
}
@@ -713,18 +834,156 @@ class RecordReaderImpl implements Record
}
}
+ /**
+ * A tree reader that will read string columns. At the start of the
+ * stripe, it creates an internal reader based on whether a direct or
+ * dictionary encoding was used.
+ */
private static class StringTreeReader extends TreeReader {
- private DynamicByteArray dictionaryBuffer = null;
- private int dictionarySize;
- private int[] dictionaryOffsets;
- private RunLengthIntegerReader reader;
+ private TreeReader reader;
StringTreeReader(Path path, int columnId) {
super(path, columnId);
}
+ @Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ reader.checkEncoding(encoding);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ // For each stripe, checks the encoding and initializes the appropriate
+ // reader
+ switch (encodings.get(columnId).getKind()) {
+ case DIRECT:
+ case DIRECT_V2:
+ reader = new StringDirectTreeReader(path, columnId);
+ break;
+ case DICTIONARY:
+ case DICTIONARY_V2:
+ reader = new StringDictionaryTreeReader(path, columnId);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported encoding " +
+ encodings.get(columnId).getKind());
+ }
+ reader.startStripe(streams, encodings);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ reader.seek(index);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ return reader.next(previous);
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skipRows(items);
+ }
+ }
+
+ /**
+ * A reader for string columns that are direct encoded in the current
+ * stripe.
+ */
+ private static class StringDirectTreeReader extends TreeReader {
+ private InStream stream;
+ private IntegerReader lengths;
+
+ StringDirectTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ stream = streams.get(name);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
+ false);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ stream.seek(index[columnId]);
+ lengths.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Text result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new Text();
+ } else {
+ result = (Text) previous;
+ }
+ int len = (int) lengths.next();
+ int offset = 0;
+ byte[] bytes = new byte[len];
+ while (len > 0) {
+ int written = stream.read(bytes, offset, len);
+ if (written < 0) {
+ throw new EOFException("Can't finish byte read from " + stream);
+ }
+ len -= written;
+ offset += written;
+ }
+ result.set(bytes);
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ items = countNonNulls(items);
+ long lengthToSkip = 0;
+ for(int i=0; i < items; ++i) {
+ lengthToSkip += lengths.next();
+ }
+ stream.skip(lengthToSkip);
+ }
+ }
+
+ /**
+ * A reader for string columns that are dictionary encoded in the current
+ * stripe.
+ */
+ private static class StringDictionaryTreeReader extends TreeReader {
+ private DynamicByteArray dictionaryBuffer;
+ private int[] dictionaryOffsets;
+ private IntegerReader reader;
+
+ StringDictionaryTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
- if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY) {
+ if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
+ encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId + " of " + path);
}
@@ -737,7 +996,7 @@ class RecordReaderImpl implements Record
super.startStripe(streams, encodings);
// read the dictionary blob
- dictionarySize = encodings.get(columnId).getDictionarySize();
+ int dictionarySize = encodings.get(columnId).getDictionarySize();
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DICTIONARY_DATA);
InStream in = streams.get(name);
@@ -752,7 +1011,8 @@ class RecordReaderImpl implements Record
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
- RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
+ IntegerReader lenReader = createIntegerReader(encodings.get(columnId)
+ .getKind(), in, false);
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
@@ -767,7 +1027,8 @@ class RecordReaderImpl implements Record
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
- reader = new RunLengthIntegerReader(streams.get(name), false);
+ reader = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(name), false);
}
@Override
@@ -839,7 +1100,9 @@ class RecordReaderImpl implements Record
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
for(TreeReader kid: fields) {
- kid.seek(index);
+ if (kid != null) {
+ kid.seek(index);
+ }
}
}
@@ -885,7 +1148,9 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(TreeReader field: fields) {
- field.skipRows(items);
+ if (field != null) {
+ field.skipRows(items);
+ }
}
}
}
@@ -965,7 +1230,7 @@ class RecordReaderImpl implements Record
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
ListTreeReader(Path path, int columnId,
List<OrcProto.Type> types,
@@ -1014,12 +1279,22 @@ class RecordReaderImpl implements Record
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
@@ -1039,7 +1314,7 @@ class RecordReaderImpl implements Record
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
- private RunLengthIntegerReader lengths;
+ private IntegerReader lengths = null;
MapTreeReader(Path path,
int columnId,
@@ -1092,12 +1367,22 @@ class RecordReaderImpl implements Record
}
@Override
+ void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
+ if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
+ (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
+ throw new IOException("Unknown encoding " + encoding + " in column " +
+ columnId + " of " + path);
+ }
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
- lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
- OrcProto.Stream.Kind.LENGTH)), false);
+ lengths = createIntegerReader(encodings.get(columnId).getKind(),
+ streams.get(new StreamName(columnId,
+ OrcProto.Stream.Kind.LENGTH)), false);
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
@@ -1173,113 +1458,707 @@ class RecordReaderImpl implements Record
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.seek(offset);
file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
- return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
- codec, bufferSize));
+ return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
+ new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
+ bufferSize));
}
- private void readStripe() throws IOException {
- StripeInformation stripe = stripes.get(currentStripe);
- stripeFooter = readStripeFooter(stripe);
- long offset = stripe.getOffset();
- streams.clear();
+ static enum Location {
+ BEFORE, MIN, MIDDLE, MAX, AFTER
+ }
- // if we aren't projecting columns, just read the whole stripe
- if (included == null) {
- byte[] buffer =
- new byte[(int) (stripe.getDataLength())];
- file.seek(offset + stripe.getIndexLength());
- file.readFully(buffer, 0, buffer.length);
- int sectionOffset = 0;
- for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
- if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
- int sectionLength = (int) section.getLength();
- ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
- sectionLength);
- StreamName name = new StreamName(section.getColumn(),
- section.getKind());
- streams.put(name,
- InStream.create(name.toString(), sectionBuffer, codec,
- bufferSize));
- sectionOffset += sectionLength;
- }
+ /**
+ * Given a point and min and max, determine if the point is before, at the
+ * min, in the middle, at the max, or after the range.
+ * @param point the point to test
+ * @param min the minimum point
+ * @param max the maximum point
+ * @param <T> the type of the comparision
+ * @return the location of the point
+ */
+ static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+ int minCompare = point.compareTo(min);
+ if (minCompare < 0) {
+ return Location.BEFORE;
+ } else if (minCompare == 0) {
+ return Location.MIN;
+ }
+ int maxCompare = point.compareTo(max);
+ if (maxCompare > 0) {
+ return Location.AFTER;
+ } else if (maxCompare == 0) {
+ return Location.MAX;
+ }
+ return Location.MIDDLE;
+ }
+
+ /**
+ * Get the minimum value out of an index entry.
+ * @param index the index entry
+ * @return the object for the minimum value or null if there isn't one
+ */
+ static Object getMin(OrcProto.ColumnStatistics index) {
+ if (index.hasIntStatistics()) {
+ OrcProto.IntegerStatistics stat = index.getIntStatistics();
+ if (stat.hasMinimum()) {
+ return stat.getMinimum();
}
- } else {
- List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
- // the index of the current section
- int currentSection = 0;
- while (currentSection < streamList.size() &&
- StreamName.getArea(streamList.get(currentSection).getKind()) !=
- StreamName.Area.DATA) {
- currentSection += 1;
- }
- // byte position of the current section relative to the stripe start
- long sectionOffset = stripe.getIndexLength();
- while (currentSection < streamList.size()) {
- int bytes = 0;
-
- // find the first section that shouldn't be read
- int excluded=currentSection;
- while (excluded < streamList.size() &&
- included[streamList.get(excluded).getColumn()]) {
- bytes += streamList.get(excluded).getLength();
- excluded += 1;
- }
-
- // actually read the bytes as a big chunk
- if (bytes != 0) {
- byte[] buffer = new byte[bytes];
- file.seek(offset + sectionOffset);
- file.readFully(buffer, 0, bytes);
- sectionOffset += bytes;
-
- // create the streams for the sections we just read
- bytes = 0;
- while (currentSection < excluded) {
- OrcProto.Stream section = streamList.get(currentSection);
- StreamName name =
- new StreamName(section.getColumn(), section.getKind());
- this.streams.put(name,
- InStream.create(name.toString(),
- ByteBuffer.wrap(buffer, bytes,
- (int) section.getLength()), codec, bufferSize));
- currentSection += 1;
- bytes += section.getLength();
+ }
+ if (index.hasStringStatistics()) {
+ OrcProto.StringStatistics stat = index.getStringStatistics();
+ if (stat.hasMinimum()) {
+ return stat.getMinimum();
+ }
+ }
+ if (index.hasDoubleStatistics()) {
+ OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+ if (stat.hasMinimum()) {
+ return stat.getMinimum();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get the maximum value out of an index entry.
+ * @param index the index entry
+ * @return the object for the maximum value or null if there isn't one
+ */
+ static Object getMax(OrcProto.ColumnStatistics index) {
+ if (index.hasIntStatistics()) {
+ OrcProto.IntegerStatistics stat = index.getIntStatistics();
+ if (stat.hasMaximum()) {
+ return stat.getMaximum();
+ }
+ }
+ if (index.hasStringStatistics()) {
+ OrcProto.StringStatistics stat = index.getStringStatistics();
+ if (stat.hasMaximum()) {
+ return stat.getMaximum();
+ }
+ }
+ if (index.hasDoubleStatistics()) {
+ OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+ if (stat.hasMaximum()) {
+ return stat.getMaximum();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Evaluate a predicate with respect to the statistics from the column
+ * that is referenced in the predicate.
+ * @param index the statistics for the column mentioned in the predicate
+ * @param predicate the leaf predicate we need to evaluation
+ * @return the set of truth values that may be returned for the given
+ * predicate.
+ */
+ static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index,
+ PredicateLeaf predicate) {
+ Object minValue = getMin(index);
+ // if we didn't have any values, everything must have been null
+ if (minValue == null) {
+ if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+ return TruthValue.YES;
+ } else {
+ return TruthValue.NULL;
+ }
+ }
+ Object maxValue = getMax(index);
+ Location loc;
+ switch (predicate.getOperator()) {
+ case NULL_SAFE_EQUALS:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return TruthValue.NO;
+ } else {
+ return TruthValue.YES_NO;
+ }
+ case EQUALS:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (minValue.equals(maxValue) && loc == Location.MIN) {
+ return TruthValue.YES_NULL;
+ } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ case LESS_THAN:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (loc == Location.AFTER) {
+ return TruthValue.YES_NULL;
+ } else if (loc == Location.BEFORE || loc == Location.MIN) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ case LESS_THAN_EQUALS:
+ loc = compareToRange((Comparable) predicate.getLiteral(),
+ minValue, maxValue);
+ if (loc == Location.AFTER || loc == Location.MAX) {
+ return TruthValue.YES_NULL;
+ } else if (loc == Location.BEFORE) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ case IN:
+ if (minValue.equals(maxValue)) {
+ // for a single value, look through to see if that value is in the
+ // set
+ for(Object arg: predicate.getLiteralList()) {
+ loc = compareToRange((Comparable) arg, minValue, maxValue);
+ if (loc == Location.MIN) {
+ return TruthValue.YES_NULL;
+ }
}
+ return TruthValue.NO_NULL;
+ } else {
+ // are all of the values outside of the range?
+ for(Object arg: predicate.getLiteralList()) {
+ loc = compareToRange((Comparable) arg, minValue, maxValue);
+ if (loc == Location.MIN || loc == Location.MIDDLE ||
+ loc == Location.MAX) {
+ return TruthValue.YES_NO_NULL;
+ }
+ }
+ return TruthValue.NO_NULL;
}
-
- // skip forward until we get back to a section that we need
- while (currentSection < streamList.size() &&
- !included[streamList.get(currentSection).getColumn()]) {
- sectionOffset += streamList.get(currentSection).getLength();
- currentSection += 1;
+ case BETWEEN:
+ List<Object> args = predicate.getLiteralList();
+ loc = compareToRange((Comparable) args.get(0), minValue, maxValue);
+ if (loc == Location.BEFORE || loc == Location.MIN) {
+ Location loc2 = compareToRange((Comparable) args.get(1), minValue,
+ maxValue);
+ if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+ return TruthValue.YES_NULL;
+ } else if (loc2 == Location.BEFORE) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
+ }
+ } else if (loc == Location.AFTER) {
+ return TruthValue.NO_NULL;
+ } else {
+ return TruthValue.YES_NO_NULL;
}
+ case IS_NULL:
+ return TruthValue.YES_NO;
+ default:
+ return TruthValue.YES_NO_NULL;
+ }
+ }
+
+ /**
+ * Pick the row groups that we need to load from the current stripe.
+ * @return an array with a boolean for each row group or null if all of the
+ * row groups must be read.
+ * @throws IOException
+ */
+ private boolean[] pickRowGroups() throws IOException {
+ // if we don't have a sarg or indexes, we read everything
+ if (sarg == null || rowIndexStride == 0) {
+ return null;
+ }
+ readRowIndex();
+ long rowsInStripe = stripes.get(currentStripe).getNumberOfRows();
+ int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
+ rowIndexStride);
+ boolean[] result = new boolean[groupsInStripe];
+ TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+ for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
+ for(int pred=0; pred < leafValues.length; ++pred) {
+ OrcProto.ColumnStatistics stats =
+ indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
+ leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stats = " + stats);
+ LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
+ leafValues[pred]);
+ }
+ }
+ result[rowGroup] = sarg.evaluate(leafValues).isNotNeeded();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+ (rowIndexStride * (rowGroup+1) - 1) + " is " +
+ (result[rowGroup] ? "" : "not ") + "included.");
}
}
- reader.startStripe(streams, stripeFooter.getColumnsList());
- rowInStripe = 0;
+
+ // if we found something to skip, use the array. otherwise, return null.
+ for(boolean b: result) {
+ if (!b) {
+ return result;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Read the current stripe into memory.
+ * @throws IOException
+ */
+ private void readStripe() throws IOException {
+ StripeInformation stripe = stripes.get(currentStripe);
+ stripeFooter = readStripeFooter(stripe);
+ streams.clear();
+ // setup the position in the stripe
rowCountInStripe = stripe.getNumberOfRows();
+ rowInStripe = 0;
rowBaseInStripe = 0;
for(int i=0; i < currentStripe; ++i) {
rowBaseInStripe += stripes.get(i).getNumberOfRows();
}
+ // reset all of the indexes
for(int i=0; i < indexes.length; ++i) {
indexes[i] = null;
}
+ includedRowGroups = pickRowGroups();
+
+ // move forward to the first unskipped row
+ if (includedRowGroups != null) {
+ while (rowInStripe < rowCountInStripe &&
+ !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+ rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+ }
+ }
+
+ // if we haven't skipped the whole stripe, read the data
+ if (rowInStripe < rowCountInStripe) {
+ // if we aren't projecting columns or filtering rows, just read it all
+ if (included == null && includedRowGroups == null) {
+ readAllDataStreams(stripe);
+ } else {
+ readPartialDataStreams(stripe);
+ }
+ reader.startStripe(streams, stripeFooter.getColumnsList());
+ // if we skipped the first row group, move the pointers forward
+ if (rowInStripe != 0) {
+ seekToRowEntry((int) (rowInStripe / rowIndexStride));
+ }
+ }
+ }
+
+ private void readAllDataStreams(StripeInformation stripe
+ ) throws IOException {
+ byte[] buffer =
+ new byte[(int) (stripe.getDataLength())];
+ file.seek(stripe.getOffset() + stripe.getIndexLength());
+ file.readFully(buffer, 0, buffer.length);
+ int sectionOffset = 0;
+ for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+ if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+ int sectionLength = (int) section.getLength();
+ ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+ sectionLength);
+ StreamName name = new StreamName(section.getColumn(),
+ section.getKind());
+ streams.put(name,
+ InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer},
+ new long[]{0}, sectionLength, codec, bufferSize));
+ sectionOffset += sectionLength;
+ }
+ }
+ }
+
+ /**
+ * The secionts of stripe that we need to read.
+ */
+ static class DiskRange {
+ /** the first address we need to read. */
+ long offset;
+ /** the first address afterwards. */
+ long end;
+
+ DiskRange(long offset, long end) {
+ this.offset = offset;
+ this.end = end;
+ if (end < offset) {
+ throw new IllegalArgumentException("invalid range " + this);
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || other.getClass() != getClass()) {
+ return false;
+ }
+ DiskRange otherR = (DiskRange) other;
+ return otherR.offset == offset && otherR.end == end;
+ }
+
+ @Override
+ public String toString() {
+ return "range start: " + offset + " end: " + end;
+ }
+ }
+
+ private static final int BYTE_STREAM_POSITIONS = 1;
+ private static final int RUN_LENGTH_BYTE_POSITIONS =
+ BYTE_STREAM_POSITIONS + 1;
+ private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+ private static final int RUN_LENGTH_INT_POSITIONS =
+ BYTE_STREAM_POSITIONS + 1;
+
+ /**
+ * Get the offset in the index positions for the column that the given
+ * stream starts.
+ * @param encoding the encoding of the column
+ * @param type the type of the column
+ * @param stream the kind of the stream
+ * @param isCompressed is the file compressed
+ * @param hasNulls does the column have a PRESENT stream?
+ * @return the number of positions that will be used for that stream
+ */
+ static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding,
+ OrcProto.Type.Kind type,
+ OrcProto.Stream.Kind stream,
+ boolean isCompressed,
+ boolean hasNulls) {
+ if (stream == OrcProto.Stream.Kind.PRESENT) {
+ return 0;
+ }
+ int compressionValue = isCompressed ? 1 : 0;
+ int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+ switch (type) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case STRUCT:
+ case MAP:
+ case LIST:
+ case UNION:
+ return base;
+ case STRING:
+ if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+ encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+ return base;
+ } else {
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ } else {
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ }
+ }
+ case BINARY:
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case DECIMAL:
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case TIMESTAMP:
+ if (stream == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+ default:
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
+ }
+
+ // for uncompressed streams, what is the most overlap with the following set
+ // of rows (long vint literal group).
+ static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+ /**
+ * Is this stream part of a dictionary?
+ * @return is this part of a dictionary?
+ */
+ static boolean isDictionary(OrcProto.Stream.Kind kind,
+ OrcProto.ColumnEncoding encoding) {
+ OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+ return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+ (kind == OrcProto.Stream.Kind.LENGTH &&
+ (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+ encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+ }
+
+ /**
+ * Plan the ranges of the file that we need to read given the list of
+ * columns and row groups.
+ * @param streamList the list of streams avaiable
+ * @param indexes the indexes that have been loaded
+ * @param includedColumns which columns are needed
+ * @param includedRowGroups which row groups are needed
+ * @param isCompressed does the file have generic compression
+ * @param encodings the encodings for each column
+ * @param types the types of the columns
+ * @param compressionSize the compression block size
+ * @return the list of disk ranges that will be loaded
+ */
+ static List<DiskRange> planReadPartialDataStreams
+ (List<OrcProto.Stream> streamList,
+ OrcProto.RowIndex[] indexes,
+ boolean[] includedColumns,
+ boolean[] includedRowGroups,
+ boolean isCompressed,
+ List<OrcProto.ColumnEncoding> encodings,
+ List<OrcProto.Type> types,
+ int compressionSize) {
+ List<DiskRange> result = new ArrayList<DiskRange>();
+ long offset = 0;
+ // figure out which columns have a present stream
+ boolean[] hasNull = new boolean[types.size()];
+ for(OrcProto.Stream stream: streamList) {
+ if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
+ hasNull[stream.getColumn()] = true;
+ }
+ }
+ for(OrcProto.Stream stream: streamList) {
+ long length = stream.getLength();
+ int column = stream.getColumn();
+ OrcProto.Stream.Kind streamKind = stream.getKind();
+ if (StreamName.getArea(streamKind) == StreamName.Area.DATA &&
+ includedColumns[column]) {
+ // if we aren't filtering or it is a dictionary, load it.
+ if (includedRowGroups == null ||
+ isDictionary(streamKind, encodings.get(column))) {
+ result.add(new DiskRange(offset, offset + length));
+ } else {
+ for(int group=0; group < includedRowGroups.length; ++group) {
+ if (includedRowGroups[group]) {
+ int posn = getIndexPosition(encodings.get(column).getKind(),
+ types.get(column).getKind(), stream.getKind(), isCompressed,
+ hasNull[column]);
+ long start = indexes[column].getEntry(group).getPositions(posn);
+ // figure out the worst case last location
+ long end = (group == includedRowGroups.length - 1) ?
+ length : Math.min(length,
+ indexes[column].getEntry(group + 1)
+ .getPositions(posn)
+ + (isCompressed ?
+ (OutStream.HEADER_SIZE
+ + compressionSize) :
+ WORST_UNCOMPRESSED_SLOP));
+ result.add(new DiskRange(offset + start, offset + end));
+ }
+ }
+ }
+ }
+ offset += length;
+ }
+ return result;
+ }
+
+ /**
+ * Update the disk ranges to collapse adjacent or overlapping ranges. It
+ * assumes that the ranges are sorted.
+ * @param ranges the list of disk ranges to merge
+ */
+ static void mergeDiskRanges(List<DiskRange> ranges) {
+ DiskRange prev = null;
+ for(int i=0; i < ranges.size(); ++i) {
+ DiskRange current = ranges.get(i);
+ if (prev != null && overlap(prev.offset, prev.end,
+ current.offset, current.end)) {
+ prev.offset = Math.min(prev.offset, current.offset);
+ prev.end = Math.max(prev.end, current.end);
+ ranges.remove(i);
+ i -= 1;
+ } else {
+ prev = current;
+ }
+ }
+ }
+
+ /**
+ * Read the list of ranges from the file.
+ * @param file the file to read
+ * @param base the base of the stripe
+ * @param ranges the disk ranges within the stripe to read
+ * @return the bytes read for each disk range, which is the same length as
+ * ranges
+ * @throws IOException
+ */
+ static byte[][] readDiskRanges(FSDataInputStream file,
+ long base,
+ List<DiskRange> ranges) throws IOException {
+ byte[][] result = new byte[ranges.size()][];
+ int i = 0;
+ for(DiskRange range: ranges) {
+ int len = (int) (range.end - range.offset);
+ result[i] = new byte[len];
+ file.seek(base + range.offset);
+ file.readFully(result[i]);
+ i += 1;
+ }
+ return result;
+ }
+
+ /**
+ * Does region A overlap region B? The end points are inclusive on both sides.
+ * @param leftA A's left point
+ * @param rightA A's right point
+ * @param leftB B's left point
+ * @param rightB B's right point
+ * @return Does region A overlap region B?
+ */
+ static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+ if (leftA <= leftB) {
+ return rightA >= leftB;
+ }
+ return rightB >= leftA;
+ }
+
+ /**
+ * Build a string representation of a list of disk ranges.
+ * @param ranges ranges to stringify
+ * @return the resulting string
+ */
+ static String stringifyDiskRanges(List<DiskRange> ranges) {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("[");
+ for(int i=0; i < ranges.size(); ++i) {
+ if (i != 0) {
+ buffer.append(", ");
+ }
+ buffer.append(ranges.get(i).toString());
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ static void createStreams(List<OrcProto.Stream> streamDescriptions,
+ List<DiskRange> ranges,
+ byte[][] bytes,
+ boolean[] includeColumn,
+ CompressionCodec codec,
+ int bufferSize,
+ Map<StreamName, InStream> streams
+ ) throws IOException {
+ long offset = 0;
+ for(OrcProto.Stream streamDesc: streamDescriptions) {
+ int column = streamDesc.getColumn();
+ if (includeColumn[column] &&
+ StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
+ long length = streamDesc.getLength();
+ int first = -1;
+ int last = -2;
+ for(int i=0; i < bytes.length; ++i) {
+ DiskRange range = ranges.get(i);
+ if (overlap(offset, offset+length, range.offset, range.end)) {
+ if (first == -1) {
+ first = i;
+ }
+ last = i;
+ }
+ }
+ ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
+ long[] offsets = new long[last - first + 1];
+ for(int i=0; i < buffers.length; ++i) {
+ DiskRange range = ranges.get(i + first);
+ long start = Math.max(range.offset, offset);
+ long end = Math.min(range.end, offset+length);
+ buffers[i] = ByteBuffer.wrap(bytes[first + i],
+ Math.max(0, (int) (offset - range.offset)), (int) (end - start));
+ offsets[i] = Math.max(0, range.offset - offset);
+ }
+ StreamName name = new StreamName(column, streamDesc.getKind());
+ streams.put(name, InStream.create(name.toString(), buffers, offsets,
+ length, codec, bufferSize));
+ }
+ offset += streamDesc.getLength();
+ }
+ }
+
+ private void readPartialDataStreams(StripeInformation stripe
+ ) throws IOException {
+ List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+ List<DiskRange> chunks =
+ planReadPartialDataStreams(streamList,
+ indexes, included, includedRowGroups, codec != null,
+ stripeFooter.getColumnsList(), types, bufferSize);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("chunks = " + stringifyDiskRanges(chunks));
+ }
+ mergeDiskRanges(chunks);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("merge = " + stringifyDiskRanges(chunks));
+ }
+ byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks);
+ createStreams(streamList, chunks, bytes, included, codec, bufferSize,
+ streams);
}
@Override
public boolean hasNext() throws IOException {
- return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+ return rowInStripe < rowCountInStripe;
}
- @Override
- public Object next(Object previous) throws IOException {
- if (rowInStripe >= rowCountInStripe) {
+ /**
+ * Read the next stripe until we find a row that we don't skip.
+ * @throws IOException
+ */
+ private void advanceStripe() throws IOException {
+ rowInStripe = rowCountInStripe;
+ while (rowInStripe >= rowCountInStripe &&
+ currentStripe < stripes.size() - 1) {
currentStripe += 1;
readStripe();
}
+ }
+
+ /**
+ * Skip over rows that we aren't selecting, so that the next row is
+ * one that we will read.
+ * @param nextRow the row we want to go to
+ * @throws IOException
+ */
+ private void advanceToNextRow(long nextRow) throws IOException {
+ long nextRowInStripe = nextRow - rowBaseInStripe;
+ // check for row skipping
+ if (rowIndexStride != 0 &&
+ includedRowGroups != null &&
+ nextRowInStripe < rowCountInStripe) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ if (!includedRowGroups[rowGroup]) {
+ while (rowGroup < includedRowGroups.length &&
+ !includedRowGroups[rowGroup]) {
+ rowGroup += 1;
+ }
+ // if we are off the end of the stripe, just move stripes
+ if (rowGroup >= includedRowGroups.length) {
+ advanceStripe();
+ return;
+ }
+ nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+ }
+ }
+ if (nextRowInStripe < rowCountInStripe) {
+ if (nextRowInStripe != rowInStripe) {
+ if (rowIndexStride != 0) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ seekToRowEntry(rowGroup);
+ reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+ } else {
+ reader.skipRows(nextRowInStripe - rowInStripe);
+ }
+ rowInStripe = nextRowInStripe;
+ }
+ } else {
+ advanceStripe();
+ }
+ }
+
+ @Override
+ public Object next(Object previous) throws IOException {
+ Object result = reader.next(previous);
+ // find the next row
rowInStripe += 1;
- return reader.next(previous);
+ advanceToNextRow(rowInStripe + rowBaseInStripe);
+ return result;
}
@Override
@@ -1303,14 +2182,6 @@ class RecordReaderImpl implements Record
}
private int findStripe(long rowNumber) {
- if (rowNumber < 0) {
- throw new IllegalArgumentException("Seek to a negative row number " +
- rowNumber);
- } else if (rowNumber < firstRow) {
- throw new IllegalArgumentException("Seek before reader range " +
- rowNumber);
- }
- rowNumber -= firstRow;
for(int i=0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
if (stripe.getNumberOfRows() > rowNumber) {
@@ -1331,7 +2202,8 @@ class RecordReaderImpl implements Record
file.seek(offset);
file.readFully(buffer);
indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
- ByteBuffer.wrap(buffer), codec, bufferSize));
+ new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0},
+ stream.getLength(), codec, bufferSize));
}
}
offset += stream.getLength();
@@ -1351,19 +2223,25 @@ class RecordReaderImpl implements Record
@Override
public void seekToRow(long rowNumber) throws IOException {
+ if (rowNumber < 0) {
+ throw new IllegalArgumentException("Seek to a negative row number " +
+ rowNumber);
+ } else if (rowNumber < firstRow) {
+ throw new IllegalArgumentException("Seek before reader range " +
+ rowNumber);
+ }
+ // convert to our internal form (rows from the beginning of slice)
+ rowNumber -= firstRow;
+
+ // move to the right stripe
int rightStripe = findStripe(rowNumber);
if (rightStripe != currentStripe) {
currentStripe = rightStripe;
readStripe();
}
readRowIndex();
- rowInStripe = rowNumber - rowBaseInStripe;
- if (rowIndexStride != 0) {
- long entry = rowInStripe / rowIndexStride;
- seekToRowEntry((int) entry);
- reader.skipRows(rowInStripe - entry * rowIndexStride);
- } else {
- reader.skipRows(rowInStripe);
- }
+
+ // if we aren't to the right row yet, advanance in the stripe.
+ advanceToNextRow(rowNumber);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Mon Aug 26 21:42:12 2013
@@ -57,7 +57,7 @@ class RunLengthByteReader {
while (bytes < numLiterals) {
int result = input.read(literals, bytes, numLiterals - bytes);
if (result == -1) {
- throw new EOFException("Reading RLE byte literal got EOF");
+ throw new EOFException("Reading RLE byte literal got EOF in " + this);
}
bytes += result;
}
@@ -108,4 +108,10 @@ class RunLengthByteReader {
items -= consume;
}
}
+
+ @Override
+ public String toString() {
+ return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+ used + "/" + numLiterals + " from " + input;
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Mon Aug 26 21:42:12 2013
@@ -23,7 +23,7 @@ import java.io.IOException;
/**
* A reader that reads a sequence of integers.
* */
-class RunLengthIntegerReader {
+class RunLengthIntegerReader implements IntegerReader {
private final InStream input;
private final boolean signed;
private final long[] literals =
@@ -71,11 +71,13 @@ class RunLengthIntegerReader {
}
}
- boolean hasNext() throws IOException {
+ @Override
+ public boolean hasNext() throws IOException {
return used != numLiterals || input.available() > 0;
}
- long next() throws IOException {
+ @Override
+ public long next() throws IOException {
long result;
if (used == numLiterals) {
readValues();
@@ -88,7 +90,8 @@ class RunLengthIntegerReader {
return result;
}
- void seek(PositionProvider index) throws IOException {
+ @Override
+ public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
if (consumed != 0) {
@@ -104,7 +107,8 @@ class RunLengthIntegerReader {
}
}
- void skip(long numValues) throws IOException {
+ @Override
+ public void skip(long numValues) throws IOException {
while (numValues > 0) {
if (used == numLiterals) {
readValues();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriter.java Mon Aug 26 21:42:12 2013
@@ -25,7 +25,7 @@ import java.io.IOException;
* repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128
* literal vint values follow.
*/
-class RunLengthIntegerWriter {
+class RunLengthIntegerWriter implements IntegerWriter {
static final int MIN_REPEAT_SIZE = 3;
static final int MAX_DELTA = 127;
static final int MIN_DELTA = -128;
@@ -71,12 +71,14 @@ class RunLengthIntegerWriter {
}
}
- void flush() throws IOException {
+ @Override
+ public void flush() throws IOException {
writeValues();
output.flush();
}
- void write(long value) throws IOException {
+ @Override
+ public void write(long value) throws IOException {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
@@ -130,8 +132,10 @@ class RunLengthIntegerWriter {
}
}
- void getPosition(PositionRecorder recorder) throws IOException {
+ @Override
+ public void getPosition(PositionRecorder recorder) throws IOException {
output.getPosition(recorder);
recorder.addPosition(numLiterals);
}
+
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Mon Aug 26 21:42:12 2013
@@ -185,4 +185,279 @@ final class SerializationUtils {
result = result.shiftRight(1);
return result;
}
+
+ enum FixedBitSizes {
+ ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE,
+ THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN,
+ TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX,
+ TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR;
+ }
+
+ /**
+ * Count the number of bits required to encode the given value
+ * @param value
+ * @return bits required to store value
+ */
+ static int findClosestNumBits(long value) {
+ int count = 0;
+ while (value > 0) {
+ count++;
+ value = value >>> 1;
+ }
+ return getClosestFixedBits(count);
+ }
+
+ /**
+ * zigzag encode the given value
+ * @param val
+ * @return zigzag encoded value
+ */
+ static long zigzagEncode(long val) {
+ return (val << 1) ^ (val >> 63);
+ }
+
+ /**
+ * zigzag decode the given value
+ * @param val
+ * @return zizag decoded value
+ */
+ static long zigzagDecode(long val) {
+ return (val >>> 1) ^ -(val & 1);
+ }
+
+ /**
+ * Compute the bits required to represent pth percentile value
+ * @param data - array
+ * @param p - percentile value (>=0.0 to <=1.0)
+ * @return pth percentile bits
+ */
+ static int percentileBits(long[] data, double p) {
+ if ((p > 1.0) || (p <= 0.0)) {
+ return -1;
+ }
+
+ // histogram that store the encoded bit requirement for each values.
+ // maximum number of bits that can encoded is 32 (refer FixedBitSizes)
+ int[] hist = new int[32];
+
+ // compute the histogram
+ for(long l : data) {
+ int idx = encodeBitWidth(findClosestNumBits(l));
+ hist[idx] += 1;
+ }
+
+ int len = data.length;
+ int perLen = (int) (len * (1.0 - p));
+
+ // return the bits required by pth percentile length
+ for(int i = hist.length - 1; i >= 0; i--) {
+ perLen -= hist[i];
+ if (perLen < 0) {
+ return decodeBitWidth(i);
+ }
+ }
+
+ return 0;
+ }
+
+ /**
+ * Read n bytes in big endian order and convert to long
+ * @param b - byte array
+ * @return long value
+ */
+ static long bytesToLongBE(InStream input, int n) throws IOException {
+ long out = 0;
+ long val = 0;
+ while (n > 0) {
+ n--;
+ // store it in a long and then shift else integer overflow will occur
+ val = input.read();
+ out |= (val << (n * 8));
+ }
+ return out;
+ }
+
+ /**
+ * Calculate the number of bytes required
+ * @param n - number of values
+ * @param numBits - bit width
+ * @return number of bytes required
+ */
+ static int getTotalBytesRequired(int n, int numBits) {
+ return (n * numBits + 7) / 8;
+ }
+
+ /**
+ * For a given fixed bit this function will return the closest available fixed
+ * bit
+ * @param n
+ * @return closest valid fixed bit
+ */
+ static int getClosestFixedBits(int n) {
+ if (n == 0) {
+ return 1;
+ }
+
+ if (n >= 1 && n <= 24) {
+ return n;
+ } else if (n > 24 && n <= 26) {
+ return 26;
+ } else if (n > 26 && n <= 28) {
+ return 28;
+ } else if (n > 28 && n <= 30) {
+ return 30;
+ } else if (n > 30 && n <= 32) {
+ return 32;
+ } else if (n > 32 && n <= 40) {
+ return 40;
+ } else if (n > 40 && n <= 48) {
+ return 48;
+ } else if (n > 48 && n <= 56) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Finds the closest available fixed bit width match and returns its encoded
+ * value (ordinal)
+ * @param n - fixed bit width to encode
+ * @return encoded fixed bit width
+ */
+ static int encodeBitWidth(int n) {
+ n = getClosestFixedBits(n);
+
+ if (n >= 1 && n <= 24) {
+ return n - 1;
+ } else if (n > 24 && n <= 26) {
+ return FixedBitSizes.TWENTYSIX.ordinal();
+ } else if (n > 26 && n <= 28) {
+ return FixedBitSizes.TWENTYEIGHT.ordinal();
+ } else if (n > 28 && n <= 30) {
+ return FixedBitSizes.THIRTY.ordinal();
+ } else if (n > 30 && n <= 32) {
+ return FixedBitSizes.THIRTYTWO.ordinal();
+ } else if (n > 32 && n <= 40) {
+ return FixedBitSizes.FORTY.ordinal();
+ } else if (n > 40 && n <= 48) {
+ return FixedBitSizes.FORTYEIGHT.ordinal();
+ } else if (n > 48 && n <= 56) {
+ return FixedBitSizes.FIFTYSIX.ordinal();
+ } else {
+ return FixedBitSizes.SIXTYFOUR.ordinal();
+ }
+ }
+
+ /**
+ * Decodes the ordinal fixed bit value to actual fixed bit width value
+ * @param n - encoded fixed bit width
+ * @return decoded fixed bit width
+ */
+ static int decodeBitWidth(int n) {
+ if (n >= FixedBitSizes.ONE.ordinal()
+ && n <= FixedBitSizes.TWENTYFOUR.ordinal()) {
+ return n + 1;
+ } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) {
+ return 26;
+ } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) {
+ return 28;
+ } else if (n == FixedBitSizes.THIRTY.ordinal()) {
+ return 30;
+ } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) {
+ return 32;
+ } else if (n == FixedBitSizes.FORTY.ordinal()) {
+ return 40;
+ } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) {
+ return 48;
+ } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) {
+ return 56;
+ } else {
+ return 64;
+ }
+ }
+
+ /**
+ * Bitpack and write the input values to underlying output stream
+ * @param input - values to write
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param output - output stream
+ * @throws IOException
+ */
+ static void writeInts(long[] input, int offset, int len, int bitSize,
+ OutputStream output) throws IOException {
+ if (input == null || input.length < 1 || offset < 0 || len < 1
+ || bitSize < 1) {
+ return;
+ }
+
+ int bitsLeft = 8;
+ byte current = 0;
+ for(int i = offset; i < (offset + len); i++) {
+ long value = input[i];
+ int bitsToWrite = bitSize;
+ while (bitsToWrite > bitsLeft) {
+ // add the bits to the bottom of the current word
+ current |= value >>> (bitsToWrite - bitsLeft);
+ // subtract out the bits we just added
+ bitsToWrite -= bitsLeft;
+ // zero out the bits above bitsToWrite
+ value &= (1L << bitsToWrite) - 1;
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ bitsLeft -= bitsToWrite;
+ current |= value << bitsLeft;
+ if (bitsLeft == 0) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ // flush
+ if (bitsLeft != 8) {
+ output.write(current);
+ current = 0;
+ bitsLeft = 8;
+ }
+ }
+
+ /**
+ * Read bitpacked integers from input stream
+ * @param buffer - input buffer
+ * @param offset - offset
+ * @param len - length
+ * @param bitSize - bit width
+ * @param input - input stream
+ * @throws IOException
+ */
+ static void readInts(long[] buffer, int offset, int len, int bitSize,
+ InStream input) throws IOException {
+ int bitsLeft = 0;
+ int current = 0;
+
+ for(int i = offset; i < (offset + len); i++) {
+ long result = 0;
+ int bitsLeftToRead = bitSize;
+ while (bitsLeftToRead > bitsLeft) {
+ result <<= bitsLeft;
+ result |= current & ((1 << bitsLeft) - 1);
+ bitsLeftToRead -= bitsLeft;
+ current = input.read();
+ bitsLeft = 8;
+ }
+
+ // handle the left over bits
+ if (bitsLeftToRead > 0) {
+ result <<= bitsLeftToRead;
+ bitsLeft -= bitsLeftToRead;
+ result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1);
+ }
+ buffer[i] = result;
+ }
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java?rev=1517707&r1=1517706&r2=1517707&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StringRedBlackTree.java Mon Aug 26 21:42:12 2013
@@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.io.Text;
-
import java.io.IOException;
import java.io.OutputStream;
+import org.apache.hadoop.io.Text;
+
/**
* A red-black tree that stores strings. The strings are stored as UTF-8 bytes
* and an offset for each entry.
@@ -147,7 +147,7 @@ class StringRedBlackTree extends RedBlac
/**
* Visit all of the nodes in the tree in sorted order.
- * @param visitor the action to be applied to each ndoe
+ * @param visitor the action to be applied to each node
* @throws IOException
*/
public void visit(Visitor visitor) throws IOException {
@@ -163,6 +163,17 @@ class StringRedBlackTree extends RedBlac
keyOffsets.clear();
}
+ public void getText(Text result, int originalPosition) {
+ int offset = keyOffsets.get(originalPosition);
+ int length;
+ if (originalPosition + 1 == keyOffsets.size()) {
+ length = byteArray.size() - offset;
+ } else {
+ length = keyOffsets.get(originalPosition + 1) - offset;
+ }
+ byteArray.setText(result, offset, length);
+ }
+
/**
* Get the size of the character data in the table.
* @return the bytes used by the table