You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:50:04 UTC
svn commit: r1625344 [4/4] - in /hive/branches/llap: ./
common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/tez/
itests/qtest/ llap-client/ llap-client/src/ llap-client/src/java/
llap-client/src/java/org/ llap-client/src/java/org/apache/ ...
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Tue Sep 16 17:50:02 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
/**
@@ -75,4 +76,19 @@ public interface RecordReader {
* Seek to a particular row number.
*/
void seekToRow(long rowCount) throws IOException;
+
+ /**
+ * TODO: change to interface rather than ctx obj?
+ * TODO: write javadoc.
+ * @return
+ */
+ Object prepareColumnRead();
+
+ /**
+ * TODO: write javadoc.
+ * @param writer
+ * @return
+ */
+ boolean readNextColumnStripe(Object ctxObj, ChunkWriter writer)
+ throws IOException;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Sep 16 17:50:02 2014
@@ -42,6 +42,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -283,7 +287,7 @@ class RecordReaderImpl implements Record
reader = createTreeReader(path, 0, types, included, conf);
indexes = new OrcProto.RowIndex[types.size()];
rowIndexStride = strideRate;
- advanceToNextRow(0L);
+ advanceToNextRow(reader, 0L, true);
}
private static final class PositionProviderImpl implements PositionProvider {
@@ -303,7 +307,7 @@ class RecordReaderImpl implements Record
private abstract static class TreeReader {
protected final Path path;
protected final int columnId;
- private BitFieldReader present = null;
+ protected BitFieldReader present = null;
protected boolean valuePresent = false;
protected final Configuration conf;
@@ -392,7 +396,6 @@ class RecordReaderImpl implements Record
* @throws IOException
*/
Object nextVector(Object previousVector, long batchSize) throws IOException {
-
ColumnVector result = (ColumnVector) previousVector;
if (present != null) {
// Set noNulls and isNull vector of the ColumnVector based on
@@ -414,9 +417,11 @@ class RecordReaderImpl implements Record
}
return previousVector;
}
+
+ public abstract long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException;
}
- private static class BooleanTreeReader extends TreeReader{
+ private static class BooleanTreeReader extends TreeReader {
private BitFieldReader reader = null;
BooleanTreeReader(Path path, int columnId, Configuration conf) {
@@ -474,6 +479,11 @@ class RecordReaderImpl implements Record
reader.nextVector(result, batchSize);
return result;
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+ return reader.nextChunk(writer, present, rowsLeft);
+ }
}
private static class ByteTreeReader extends TreeReader{
@@ -534,6 +544,11 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+ return reader.nextChunk(writer, present, rowsLeft);
+ }
}
private static class ShortTreeReader extends TreeReader{
@@ -604,6 +619,11 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+ return reader.nextChunk(writer, present, rowsLeft);
+ }
}
private static class IntTreeReader extends TreeReader{
@@ -674,6 +694,11 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+ return reader.nextChunk(writer, present, rowsLeft);
+ }
}
private static class LongTreeReader extends TreeReader{
@@ -744,6 +769,11 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+ return reader.nextChunk(writer, present, rowsLeft);
+ }
}
private static class FloatTreeReader extends TreeReader{
@@ -826,6 +856,45 @@ class RecordReaderImpl implements Record
utils.readFloat(stream);
}
}
+
+ private double[] values;
+ private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeftToRead) throws IOException {
+ boolean mayHaveNulls = present != null;
+ NullsState nullState = mayHaveNulls ? NullsState.HAS_NULLS : NullsState.NO_NULLS;
+ int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+ if (rowsLeftToWrite == 0) {
+ return 0; // Cannot write any rows into this writer.
+ }
+ // If we send values to llap one by one, it will be hard for it to decide how to
+ // store them wrt nulls. Therefore, we'll group values together and send in groups.
+ if (values == null) {
+ values = new double[LlapUtils.DOUBLE_GROUP_SIZE];
+ }
+ long originalRowsLeft = rowsLeftToRead;
+ // Start the big loop to read rows until we run out of either input or space.
+ while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+ int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+ presentHelper.availLength = Math.min(values.length, rowsToTransfer);
+ if (mayHaveNulls) {
+ LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+ }
+ if (presentHelper.isNullsRun) {
+ writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+ } else {
+ for (int i = 0; i < presentHelper.availLength; ++i) {
+ values[i] = utils.readFloat(stream);
+ }
+ writer.writeDoubles(values, 0, presentHelper.availLength,
+ presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : nullState);
+ }
+ rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+ rowsLeftToRead -= presentHelper.availLength;
+ }
+ writer.finishCurrentSegment();
+ return (int)(originalRowsLeft - rowsLeftToRead);
+ }
}
private static class DoubleTreeReader extends TreeReader{
@@ -906,6 +975,46 @@ class RecordReaderImpl implements Record
items = countNonNulls(items);
stream.skip(items * 8);
}
+
+
+ private double[] values;
+ private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeftToRead) throws IOException {
+ boolean mayHaveNulls = present != null;
+ NullsState nullState = mayHaveNulls ? NullsState.HAS_NULLS : NullsState.NO_NULLS;
+ int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+ if (rowsLeftToWrite == 0) {
+ return 0; // Cannot write any rows into this writer.
+ }
+ // If we send values to llap one by one, it will be hard for it to decide how to
+ // store them wrt nulls. Therefore, we'll group values together and send in groups.
+ if (values == null) {
+ values = new double[LlapUtils.DOUBLE_GROUP_SIZE];
+ }
+ long originalRowsLeft = rowsLeftToRead;
+ // Start the big loop to read rows until we run out of either input or space.
+ while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+ int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+ presentHelper.availLength = Math.min(values.length, rowsToTransfer);
+ if (mayHaveNulls) {
+ LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+ }
+ if (presentHelper.isNullsRun) {
+ writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+ } else {
+ for (int i = 0; i < presentHelper.availLength; ++i) {
+ values[i] = utils.readDouble(stream);
+ }
+ writer.writeDoubles(values, 0, presentHelper.availLength,
+ presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : nullState);
+ }
+ rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+ rowsLeftToRead -= presentHelper.availLength;
+ }
+ writer.finishCurrentSegment();
+ return (int)(originalRowsLeft - rowsLeftToRead);
+ }
}
private static class BinaryTreeReader extends TreeReader{
@@ -997,9 +1106,15 @@ class RecordReaderImpl implements Record
}
stream.skip(lengthToSkip);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ // TODO: string support would be here
+ throw new UnsupportedOperationException();
+ }
}
- private static class TimestampTreeReader extends TreeReader{
+ private static class TimestampTreeReader extends TreeReader {
private IntegerReader data = null;
private IntegerReader nanos = null;
private final LongColumnVector nanoVector = new LongColumnVector();
@@ -1128,6 +1243,12 @@ class RecordReaderImpl implements Record
data.skip(items);
nanos.skip(items);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ // TODO: timestamp support would be here
+ throw new UnsupportedOperationException();
+ }
}
private static class DateTreeReader extends TreeReader{
@@ -1198,6 +1319,11 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+ return reader.nextChunk(writer, present, rowsLeft);
+ }
}
private static class DecimalTreeReader extends TreeReader{
@@ -1315,6 +1441,12 @@ class RecordReaderImpl implements Record
}
scaleStream.skip(items);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ // TODO: decimal support would be here
+ throw new UnsupportedOperationException();
+ }
}
/**
@@ -1375,6 +1507,12 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skipRows(items);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ // TODO: string support would be here
+ throw new UnsupportedOperationException();
+ }
}
// This class collects together very similar methods for reading an ORC vector of byte arrays and
@@ -1542,6 +1680,12 @@ class RecordReaderImpl implements Record
}
stream.skip(lengthToSkip);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ // TODO: string support would be here
+ throw new UnsupportedOperationException();
+ }
}
/**
@@ -1717,6 +1861,12 @@ class RecordReaderImpl implements Record
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ // TODO: string support would be here
+ throw new UnsupportedOperationException();
+ }
}
private static class CharTreeReader extends StringTreeReader {
@@ -1850,6 +2000,7 @@ class RecordReaderImpl implements Record
private static class StructTreeReader extends TreeReader {
private final TreeReader[] fields;
private final String[] fieldNames;
+ private final List<TreeReader> readers;
StructTreeReader(Path path, int columnId,
List<OrcProto.Type> types,
@@ -1859,10 +2010,12 @@ class RecordReaderImpl implements Record
int fieldCount = type.getFieldNamesCount();
this.fields = new TreeReader[fieldCount];
this.fieldNames = new String[fieldCount];
+ this.readers = new ArrayList<TreeReader>();
for(int i=0; i < fieldCount; ++i) {
int subtype = type.getSubtypes(i);
if (included == null || included[subtype]) {
this.fields[i] = createTreeReader(path, subtype, types, included, conf);
+ readers.add(this.fields[i]);
}
this.fieldNames[i] = type.getFieldNames(i);
}
@@ -1904,6 +2057,21 @@ class RecordReaderImpl implements Record
return result;
}
+ /**
+ * @return Total count of <b>non-null</b> field readers.
+ */
+ int getReaderCount() {
+ return readers.size();
+ }
+
+ /**
+ * @param readerIndex Index among <b>non-null</b> readers. Not a column index!
+ * @return The readerIndex-s <b>non-null</b> field reader.
+ */
+ TreeReader getColumnReader(int readerIndex) {
+ return readers.get(readerIndex);
+ }
+
@Override
Object nextVector(Object previousVector, long batchSize) throws IOException {
ColumnVector[] result = null;
@@ -1947,6 +2115,11 @@ class RecordReaderImpl implements Record
}
}
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ throw new UnsupportedOperationException("Non-primitives are not supported");
+ }
}
private static class UnionTreeReader extends TreeReader {
@@ -2026,6 +2199,11 @@ class RecordReaderImpl implements Record
fields[i].skipRows(counts[i]);
}
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ throw new UnsupportedOperationException("Non-primitives are not supported");
+ }
}
private static class ListTreeReader extends TreeReader {
@@ -2115,6 +2293,11 @@ class RecordReaderImpl implements Record
}
elementReader.skipRows(childSkip);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ throw new UnsupportedOperationException("Non-primitives are not supported");
+ }
}
private static class MapTreeReader extends TreeReader {
@@ -2213,6 +2396,11 @@ class RecordReaderImpl implements Record
keyReader.skipRows(childSkip);
valueReader.skipRows(childSkip);
}
+
+ @Override
+ public long nextChunk(ChunkWriter writer, long rowsLeft) {
+ throw new UnsupportedOperationException("Non-primitives are not supported");
+ }
}
private static TreeReader createTreeReader(Path path,
@@ -2669,7 +2857,7 @@ class RecordReaderImpl implements Record
reader.startStripe(streams, stripeFooter.getColumnsList());
// if we skipped the first row group, move the pointers forward
if (rowInStripe != 0) {
- seekToRowEntry((int) (rowInStripe / rowIndexStride));
+ seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
}
}
}
@@ -2680,7 +2868,7 @@ class RecordReaderImpl implements Record
long end = start + stripe.getDataLength();
// explicitly trigger 1 big read
DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
- bufferChunks = readDiskRanges(file, stripe.getOffset(), Arrays.asList(ranges));
+ bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges));
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
}
@@ -2936,7 +3124,8 @@ class RecordReaderImpl implements Record
* ranges
* @throws IOException
*/
- List<BufferChunk> readDiskRanges(FSDataInputStream file,
+ static List<BufferChunk> readDiskRanges(FSDataInputStream file,
+ ZeroCopyReaderShim zcr,
long base,
List<DiskRange> ranges) throws IOException {
ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
@@ -3062,7 +3251,7 @@ class RecordReaderImpl implements Record
if (LOG.isDebugEnabled()) {
LOG.debug("merge = " + stringifyDiskRanges(chunks));
}
- bufferChunks = readDiskRanges(file, stripe.getOffset(), chunks);
+ bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks);
createStreams(streamList, bufferChunks, included, codec, bufferSize,
streams);
}
@@ -3091,7 +3280,8 @@ class RecordReaderImpl implements Record
* @param nextRow the row we want to go to
* @throws IOException
*/
- private void advanceToNextRow(long nextRow) throws IOException {
+ private boolean advanceToNextRow(
+ TreeReader reader, long nextRow, boolean canAdvanceStripe) throws IOException {
long nextRowInStripe = nextRow - rowBaseInStripe;
// check for row skipping
if (rowIndexStride != 0 &&
@@ -3099,32 +3289,35 @@ class RecordReaderImpl implements Record
nextRowInStripe < rowCountInStripe) {
int rowGroup = (int) (nextRowInStripe / rowIndexStride);
if (!includedRowGroups[rowGroup]) {
- while (rowGroup < includedRowGroups.length &&
- !includedRowGroups[rowGroup]) {
+ while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
rowGroup += 1;
}
- // if we are off the end of the stripe, just move stripes
if (rowGroup >= includedRowGroups.length) {
- advanceStripe();
- return;
+ if (canAdvanceStripe) {
+ advanceStripe();
+ }
+ return canAdvanceStripe;
}
nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
}
}
- if (nextRowInStripe < rowCountInStripe) {
- if (nextRowInStripe != rowInStripe) {
- if (rowIndexStride != 0) {
- int rowGroup = (int) (nextRowInStripe / rowIndexStride);
- seekToRowEntry(rowGroup);
- reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
- } else {
- reader.skipRows(nextRowInStripe - rowInStripe);
- }
- rowInStripe = nextRowInStripe;
+ if (nextRowInStripe >= rowCountInStripe) {
+ if (canAdvanceStripe) {
+ advanceStripe();
}
- } else {
- advanceStripe();
+ return canAdvanceStripe;
+ }
+ if (nextRowInStripe != rowInStripe) {
+ if (rowIndexStride != 0) {
+ int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+ seekToRowEntry(reader, rowGroup);
+ reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+ } else {
+ reader.skipRows(nextRowInStripe - rowInStripe);
+ }
+ rowInStripe = nextRowInStripe;
}
+ return true;
}
@Override
@@ -3132,7 +3325,7 @@ class RecordReaderImpl implements Record
Object result = reader.next(previous);
// find the next row
rowInStripe += 1;
- advanceToNextRow(rowInStripe + rowBaseInStripe);
+ advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
if (LOG.isDebugEnabled()) {
LOG.debug("row from " + reader.path);
LOG.debug("orc row = " + result);
@@ -3148,8 +3341,26 @@ class RecordReaderImpl implements Record
readStripe();
}
- long batchSize = 0;
+ long batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
+
+ rowInStripe += batchSize;
+ if (previous == null) {
+ ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+ result = new VectorizedRowBatch(cols.length);
+ result.cols = cols;
+ } else {
+ result = (VectorizedRowBatch) previous;
+ result.selectedInUse = false;
+ reader.nextVector(result.cols, (int) batchSize);
+ }
+
+ result.size = (int) batchSize;
+ advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+ return result;
+ }
+ private long computeBatchSize(long targetBatchSize) {
+ long batchSize = 0;
// In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
// groups are selected then marker position is set to the end of range (subset of row groups
// within strip). Batch size computed out of marker position makes sure that batch size is
@@ -3170,29 +3381,15 @@ class RecordReaderImpl implements Record
final long markerPosition = (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
: rowCountInStripe;
- batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (markerPosition - rowInStripe));
+ batchSize = Math.min(targetBatchSize, (markerPosition - rowInStripe));
- if (LOG.isDebugEnabled() && batchSize < VectorizedRowBatch.DEFAULT_SIZE) {
+ if (LOG.isDebugEnabled() && batchSize < targetBatchSize) {
LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
}
} else {
- batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe));
- }
-
- rowInStripe += batchSize;
- if (previous == null) {
- ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
- result = new VectorizedRowBatch(cols.length);
- result.cols = cols;
- } else {
- result = (VectorizedRowBatch) previous;
- result.selectedInUse = false;
- reader.nextVector(result.cols, (int) batchSize);
+ batchSize = Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
}
-
- result.size = (int) batchSize;
- advanceToNextRow(rowInStripe + rowBaseInStripe);
- return result;
+ return batchSize;
}
@Override
@@ -3257,12 +3454,11 @@ class RecordReaderImpl implements Record
return indexes;
}
- private void seekToRowEntry(int rowEntry) throws IOException {
+ private void seekToRowEntry(TreeReader reader, int rowEntry) throws IOException {
PositionProvider[] index = new PositionProvider[indexes.length];
- for(int i=0; i < indexes.length; ++i) {
+ for (int i = 0; i < indexes.length; ++i) {
if (indexes[i] != null) {
- index[i]=
- new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+ index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
}
}
reader.seek(index);
@@ -3289,6 +3485,89 @@ class RecordReaderImpl implements Record
readRowIndex(currentStripe);
// if we aren't to the right row yet, advanance in the stripe.
- advanceToNextRow(rowNumber);
+ advanceToNextRow(reader, rowNumber, true);
+ }
+
+ /**
+ * Iterator-like context to read ORC as a sequence of column x stripe "cells".
+ * TODO: for this to actually be an iterator-like thing, we need to clone nested reader state.
+ * As of now, we advance parent's shared column readers separately, which would cause
+ * other calls (e.g. nextBatch) to break once nextColumnStripe is called. Currently,
+ * it is always called alone, so this is ok; context is merely a convenience class.
+ */
+ private static class ColumnReadContext {
+ public ColumnReadContext(StructTreeReader reader) {
+ StructTreeReader structReader = (StructTreeReader)reader;
+ readers = new TreeReader[structReader.getReaderCount()];
+ for (int i = 0; i < readers.length; ++i) {
+ readers[i] = structReader.getColumnReader(i);
+ }
+ }
+ /** Readers for each separate column; no nulls, just the columns being read. */
+ private final TreeReader[] readers;
+ /** Remembered row offset after a partial read of one column from stripe. */
+ private long rowInStripe = 0;
+ /** Next column to be read (index into readers). */
+ private int columnIx = 0;
+ /** Remaining row count for current stripe; same for every column, so don't recompute. */
+ private long remainingToReadFromStart = -1;
+ /** Whether the next call will be the first for this column x stripe. TODO: derive? */
+ private boolean firstCall = true;
+ }
+
+ @Override
+ public Object prepareColumnRead() {
+ return new ColumnReadContext((StructTreeReader)this.reader);
+ }
+
+ @Override
+ public boolean readNextColumnStripe(
+ Object ctxObj, ChunkWriter writer) throws IOException {
+ ColumnReadContext ctx = (ColumnReadContext)ctxObj;
+ if (rowInStripe >= rowCountInStripe) {
+ assert ctx.columnIx == 0;
+ currentStripe += 1;
+ readStripe();
+ }
+ long rowInStripeGlobal = rowInStripe; // Remember the global state.
+ rowInStripe = ctx.rowInStripe;
+ if (ctx.columnIx == 0 && ctx.firstCall) {
+ // We are starting a new stripe - remember the number of rows to read (same for all cols).
+ // Doesn't take into account space remaining in ChunkWriter.
+ ctx.remainingToReadFromStart = computeBatchSize(Long.MAX_VALUE);
+ }
+ long remainingToRead =
+ ctx.firstCall ? ctx.remainingToReadFromStart : computeBatchSize(Long.MAX_VALUE);
+ TreeReader columnReader = ctx.readers[ctx.columnIx];
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Calling nextChunk for " + remainingToRead);
+ }
+ long rowsRead = columnReader.nextChunk(writer, remainingToRead);
+ assert rowsRead <= remainingToRead;
+ rowInStripe += rowsRead;
+ boolean doneWithColumnStripe = (rowsRead == remainingToRead);
+ ctx.firstCall = doneWithColumnStripe; // If we are not done, there will be more calls.
+ if (!doneWithColumnStripe) {
+ // Note that we are only advancing the reader for the current column.
+ boolean hasRows = advanceToNextRow(columnReader, rowInStripe + rowBaseInStripe, false);
+ ctx.rowInStripe = rowInStripe; // Remember the current value for next call.
+ if (!hasRows) {
+ throw new AssertionError("No rows after advance; read "
+ + rowsRead + " out of " + remainingToRead);
+ }
+ } else {
+ // Done with some column + stripe.
+ ++ctx.columnIx;
+ if (ctx.columnIx == ctx.readers.length) {
+ // Done with the last column in this stripe; advance the global rowInStripe.
+ ctx.columnIx = 0;
+ ctx.rowInStripe = rowInStripeGlobal = rowInStripe;
+ } else {
+ // Revert the state back to start of stripe.
+ ctx.rowInStripe = rowInStripeGlobal;
+ }
+ }
+ rowInStripe = rowInStripeGlobal; // Restore global state.
+ return !doneWithColumnStripe;
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Tue Sep 16 17:50:02 2014
@@ -20,7 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
/**
* A reader that reads a sequence of bytes. A control byte is read before
@@ -39,11 +44,15 @@ class RunLengthByteReader {
this.input = input;
}
- private void readValues() throws IOException {
+ private void readValues(boolean ignoreEof) throws IOException {
int control = input.read();
used = 0;
if (control == -1) {
- throw new EOFException("Read past end of buffer RLE byte from " + input);
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of buffer RLE byte from " + input);
+ }
+ used = numLiterals = 0;
+ return;
} else if (control < 0x80) {
repeat = true;
numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
@@ -73,17 +82,70 @@ class RunLengthByteReader {
byte next() throws IOException {
byte result;
if (used == numLiterals) {
- readValues();
+ readValues(false);
}
if (repeat) {
- used += 1;
result = literals[0];
} else {
- result = literals[used++];
+ result = literals[used];
}
+ ++used;
return result;
}
+
+ private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+ public int nextChunk(
+ ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+ boolean mayHaveNulls = present != null;
+ int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ if (rowsLeftToWrite == 0) {
+ return 0; // Cannot write any rows into this writer.
+ }
+ long originalRowsLeft = rowsLeftToRead;
+ // Start the big loop to read rows until we run out of either input or space.
+ while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+ int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+ presentHelper.availLength = Math.min(peekNextAvailLength(), rowsToTransfer);
+ if (mayHaveNulls) {
+ LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+ }
+ assert presentHelper.availLength > 0;
+ assert rowsLeftToRead >= presentHelper.availLength;
+
+ if (presentHelper.isNullsRun) {
+ writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+ } else {
+ NullsState nullsState = !mayHaveNulls ? NullsState.NO_NULLS :
+ (presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : NullsState.HAS_NULLS);
+ if (repeat) {
+ writer.writeRepeatedLongs(literals[0], presentHelper.availLength, nullsState);
+ } else {
+ writer.writeLongs(literals, used, presentHelper.availLength, nullsState);
+ }
+ skipCurrentLiterals(presentHelper.availLength);
+ }
+ rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ rowsLeftToRead -= presentHelper.availLength;
+ } // End of big loop.
+ writer.finishCurrentSegment();
+ return (int)(originalRowsLeft - rowsLeftToRead);
+ }
+
+ private int peekNextAvailLength() throws IOException {
+ if (used == numLiterals) {
+ readValues(true);
+ }
+ return numLiterals - used;
+ }
+
+ private void skipCurrentLiterals(int valuesToSkip) {
+ if ((used + valuesToSkip) > numLiterals) {
+ throw new AssertionError("Skipping " + valuesToSkip + "; used " + used + "/" + numLiterals);
+ }
+ used += valuesToSkip;
+ }
+
void nextVector(LongColumnVector previous, long previousLen)
throws IOException {
previous.isRepeating = true;
@@ -113,7 +175,7 @@ class RunLengthByteReader {
if (consumed != 0) {
// a loop is required for cases where we break the run into two parts
while (consumed > 0) {
- readValues();
+ readValues(false);
used = consumed;
consumed -= numLiterals;
}
@@ -126,7 +188,7 @@ class RunLengthByteReader {
void skip(long items) throws IOException {
while (items > 0) {
if (used == numLiterals) {
- readValues();
+ readValues(false);
}
long consume = Math.min(items, numLiterals - used);
used += consume;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Sep 16 17:50:02 2014
@@ -20,7 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
/**
* A reader that reads a sequence of integers.
@@ -42,10 +47,14 @@ class RunLengthIntegerReader implements
this.utils = new SerializationUtils();
}
- private void readValues() throws IOException {
+ private void readValues(boolean ignoreEof) throws IOException {
int control = input.read();
if (control == -1) {
- throw new EOFException("Read past end of RLE integer from " + input);
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of RLE integer from " + input);
+ }
+ used = numLiterals = 0;
+ return;
} else if (control < 0x80) {
numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
used = 0;
@@ -84,7 +93,7 @@ class RunLengthIntegerReader implements
public long next() throws IOException {
long result;
if (used == numLiterals) {
- readValues();
+ readValues(false);
}
if (repeat) {
result = literals[0] + (used++) * delta;
@@ -94,9 +103,58 @@ class RunLengthIntegerReader implements
return result;
}
+ private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+ @Override
+ public int nextChunk(
+ ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+ boolean mayHaveNulls = present != null;
+ int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ if (rowsLeftToWrite == 0) {
+ return 0; // Cannot write any rows into this writer.
+ }
+ long originalRowsLeft = rowsLeftToRead;
+ // Start the big loop to read rows until we run out of either input or space.
+ while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+ int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+ presentHelper.availLength = Math.min(peekNextAvailLength(), rowsToTransfer);
+ if (mayHaveNulls) {
+ LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+ }
+ assert presentHelper.availLength > 0;
+ assert rowsLeftToRead >= presentHelper.availLength;
+
+ if (presentHelper.isNullsRun) {
+ writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+ } else {
+ NullsState nullsState = !mayHaveNulls ? NullsState.NO_NULLS :
+ (presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : NullsState.HAS_NULLS);
+ if (repeat && delta == 0) {
+ writer.writeRepeatedLongs(literals[0], presentHelper.availLength, nullsState);
+ } else {
+ writer.writeLongs(literals, used, presentHelper.availLength, nullsState);
+ }
+ skipCurrentLiterals(presentHelper.availLength);
+ }
+ rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ rowsLeftToRead -= presentHelper.availLength;
+ } // End of big loop.
+ writer.finishCurrentSegment();
+ return (int)(originalRowsLeft - rowsLeftToRead);
+ }
+
+ private int peekNextAvailLength() throws IOException {
+ if (used == numLiterals) {
+ readValues(true);
+ }
+ return numLiterals - used;
+ }
+
+ private void skipCurrentLiterals(int valuesToSkip) {
+ assert (used + valuesToSkip) <= numLiterals;
+ used += valuesToSkip;
+ }
@Override
- public void nextVector(LongColumnVector previous, long previousLen)
- throws IOException {
+ public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
previous.isRepeating = true;
for (int i = 0; i < previousLen; i++) {
if (!previous.isNull[i]) {
@@ -125,7 +183,7 @@ class RunLengthIntegerReader implements
if (consumed != 0) {
// a loop is required for cases where we break the run into two parts
while (consumed > 0) {
- readValues();
+ readValues(false);
used = consumed;
consumed -= numLiterals;
}
@@ -139,7 +197,7 @@ class RunLengthIntegerReader implements
public void skip(long numValues) throws IOException {
while (numValues > 0) {
if (used == numLiterals) {
- readValues();
+ readValues(false);
}
long consume = Math.min(numValues, numLiterals - used);
used += consume;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Tue Sep 16 17:50:02 2014
@@ -19,12 +19,19 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import java.util.Arrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
/**
@@ -33,13 +40,17 @@ import org.apache.hadoop.hive.ql.io.orc.
* compression techniques.
*/
class RunLengthIntegerReaderV2 implements IntegerReader {
+ public static final Log LOG = LogFactory.getLog(RunLengthIntegerReaderV2.class);
+
private final InStream input;
private final boolean signed;
private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+ private boolean isRepeating = false;
private int numLiterals = 0;
private int used = 0;
private final boolean skipCorrupt;
private final SerializationUtils utils;
+ private EncodingType currentEncoding;
RunLengthIntegerReaderV2(InStream input, boolean signed,
Configuration conf) throws IOException {
@@ -49,22 +60,25 @@ class RunLengthIntegerReaderV2 implement
this.utils = new SerializationUtils();
}
- private void readValues() throws IOException {
+ private final static EncodingType[] encodings = EncodingType.values();
+ private void readValues(boolean ignoreEof) throws IOException {
// read the first 2 bits and determine the encoding type
+ isRepeating = false;
int firstByte = input.read();
if (firstByte < 0) {
- throw new EOFException("Read past end of RLE integer from " + input);
- } else {
- int enc = (firstByte >>> 6) & 0x03;
- if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
- readShortRepeatValues(firstByte);
- } else if (EncodingType.DIRECT.ordinal() == enc) {
- readDirectValues(firstByte);
- } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
- readPatchedBaseValues(firstByte);
- } else {
- readDeltaValues(firstByte);
+ if (!ignoreEof) {
+ throw new EOFException("Read past end of RLE integer from " + input);
}
+ used = numLiterals = 0;
+ return;
+ }
+ currentEncoding = encodings[(firstByte >>> 6) & 0x03];
+ switch (currentEncoding) {
+ case SHORT_REPEAT: readShortRepeatValues(firstByte); break;
+ case DIRECT: readDirectValues(firstByte); break;
+ case PATCHED_BASE: readPatchedBaseValues(firstByte); break;
+ case DELTA: readDeltaValues(firstByte); break;
+ default: throw new IOException("Unknown encoding " + currentEncoding);
}
}
@@ -97,10 +111,16 @@ class RunLengthIntegerReaderV2 implement
// read the fixed delta value stored as vint (deltas can be negative even
// if all number are positive)
long fd = utils.readVslong(input);
-
- // add fixed deltas to adjacent values
- for(int i = 0; i < len; i++) {
- literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ if (fd == 0) {
+ isRepeating = true;
+ assert numLiterals == 1;
+ Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]);
+ numLiterals += len;
+ } else {
+ // add fixed deltas to adjacent values
+ for(int i = 0; i < len; i++) {
+ literals[numLiterals++] = literals[numLiterals - 2] + fd;
+ }
}
} else {
long deltaBase = utils.readVslong(input);
@@ -282,10 +302,18 @@ class RunLengthIntegerReaderV2 implement
val = utils.zigzagDecode(val);
}
+ if (numLiterals != 0) {
+ // Currently this always holds, which makes peekNextAvailLength simpler.
+ // If this changes, peekNextAvailLength should be adjusted accordingly.
+ throw new AssertionError("readValues called with existing values present");
+ }
// repeat the value for length times
+ isRepeating = true;
+ // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0
for(int i = 0; i < len; i++) {
- literals[numLiterals++] = val;
+ literals[i] = val;
}
+ numLiterals = len;
}
@Override
@@ -299,7 +327,7 @@ class RunLengthIntegerReaderV2 implement
if (used == numLiterals) {
numLiterals = 0;
used = 0;
- readValues();
+ readValues(false);
}
result = literals[used++];
return result;
@@ -314,7 +342,7 @@ class RunLengthIntegerReaderV2 implement
// parts
while (consumed > 0) {
numLiterals = 0;
- readValues();
+ readValues(false);
used = consumed;
consumed -= numLiterals;
}
@@ -330,7 +358,7 @@ class RunLengthIntegerReaderV2 implement
if (used == numLiterals) {
numLiterals = 0;
used = 0;
- readValues();
+ readValues(false);
}
long consume = Math.min(numValues, numLiterals - used);
used += consume;
@@ -338,6 +366,58 @@ class RunLengthIntegerReaderV2 implement
}
}
+ private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+ @Override
+ public int nextChunk(
+ ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+ boolean mayHaveNulls = present != null;
+ int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ if (rowsLeftToWrite == 0) {
+ return 0; // Cannot write any rows into this writer.
+ }
+ long originalRowsLeft = rowsLeftToRead;
+ // Start the big loop to read rows until we run out of either input or space.
+ while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+ int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+ presentHelper.availLength = Math.min(peekNextAvailLength(), rowsToTransfer);
+ if (mayHaveNulls) {
+ LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+ }
+ assert presentHelper.availLength > 0;
+ assert rowsLeftToRead >= presentHelper.availLength;
+ if (presentHelper.isNullsRun) {
+ writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+ } else {
+ NullsState nullsState = !mayHaveNulls ? NullsState.NO_NULLS :
+ (presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : NullsState.HAS_NULLS);
+ if (isRepeating) {
+ writer.writeRepeatedLongs(literals[0], presentHelper.availLength, nullsState);
+ } else {
+ writer.writeLongs(literals, used, presentHelper.availLength, nullsState);
+ }
+ skipCurrentLiterals(presentHelper.availLength);
+ }
+ rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+ rowsLeftToRead -= presentHelper.availLength;
+ } // End of big loop.
+ writer.finishCurrentSegment();
+ return (int)(originalRowsLeft - rowsLeftToRead);
+ }
+
+ private void skipCurrentLiterals(int valuesToSkip) {
+ assert (used + valuesToSkip) <= numLiterals;
+ used += valuesToSkip;
+ }
+
+ private int peekNextAvailLength() throws IOException {
+ if (used == numLiterals) {
+ numLiterals = 0;
+ used = 0;
+ readValues(true);
+ }
+ return numLiterals - used;
+ }
+
@Override
public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
previous.isRepeating = true;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Tue Sep 16 17:50:02 2014
@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -62,7 +65,7 @@ public class VectorizedOrcInputFormat ex
this.offset = fileSplit.getStart();
this.length = fileSplit.getLength();
options.range(offset, length);
- OrcInputFormat.setIncludedColumns(options, types, conf, true);
+ options.include(OrcInputFormat.genIncludedColumns(types, conf, true));
OrcInputFormat.setSearchArgument(options, types, conf, true);
this.reader = file.rowsOptions(options);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java Tue Sep 16 17:50:02 2014
@@ -1,13 +1,19 @@
package org.apache.hadoop.hive.ql.io.sarg;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
/**
* A factory for creating SearchArguments.
*/
public class SearchArgumentFactory {
+ public static final String SARG_PUSHDOWN = "sarg.pushdown";
+
public static SearchArgument create(ExprNodeGenericFuncDesc expression) {
return new SearchArgumentImpl(expression);
}
@@ -19,4 +25,14 @@ public class SearchArgumentFactory {
public static SearchArgument create(String kryo) {
return SearchArgumentImpl.fromKryo(kryo);
}
+
+ public static SearchArgument createFromConf(Configuration conf) {
+ String sargString = null;
+ if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) {
+ return create(Utilities.deserializeExpression(sargString));
+ } else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) {
+ return create(sargString);
+ }
+ return null;
+ }
}
Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Tue Sep 16 17:50:02 2014
@@ -1589,7 +1589,7 @@ public class TestInputOutputFormat {
types.add(builder.build());
SearchArgument isNull = SearchArgumentFactory.newBuilder()
.startAnd().isNull("cost").end().build();
- conf.set(OrcInputFormat.SARG_PUSHDOWN, isNull.toKryo());
+ conf.set(SearchArgumentFactory.SARG_PUSHDOWN, isNull.toKryo());
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
"url,cost");
options.include(new boolean[]{true, true, false, true, false});
Added: hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q (added)
+++ hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q Tue Sep 16 17:50:02 2014
@@ -0,0 +1,11 @@
+SET hive.vectorized.execution.enabled=true;
+
+SELECT cfloat,
+ cint,
+ cdouble,
+ cbigint
+FROM alltypesorc
+WHERE (cbigint > -23)
+ AND ((cdouble != 988888)
+ OR (cint > -863.257))
+ORDER BY cbigint, cfloat;
Added: hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv (added)
+++ hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv Tue Sep 16 17:50:02 2014
@@ -0,0 +1 @@
+set hive.llap.enabled=false;
Added: hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv (added)
+++ hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv Tue Sep 16 17:50:02 2014
@@ -0,0 +1 @@
+set hive.llap.enabled=true;