You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:22:50 UTC
[12/27] hive git commit: HIVE-11417. Move the ReaderImpl and
RowReaderImpl to the ORC module,
by making shims for the row by row reader. (omalley reviewed by prasanth_j)
http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 2199b11..e46ca51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -18,1218 +18,923 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.impl.BufferChunk;
-import org.apache.orc.ColumnStatistics;
-import org.apache.orc.impl.ColumnStatisticsImpl;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
-import org.apache.orc.DateColumnStatistics;
-import org.apache.orc.DecimalColumnStatistics;
-import org.apache.orc.DoubleColumnStatistics;
-import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.OrcConf;
-import org.apache.orc.impl.OrcIndex;
-import org.apache.orc.impl.PositionProvider;
-import org.apache.orc.impl.StreamName;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TimestampColumnStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.BloomFilterIO;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-public class RecordReaderImpl implements RecordReader {
+public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
+ implements RecordReader {
static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
- private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
- private static final Object UNKNOWN_VALUE = new Object();
- private final Path path;
- private final long firstRow;
- private final List<StripeInformation> stripes =
- new ArrayList<StripeInformation>();
- private OrcProto.StripeFooter stripeFooter;
- private final long totalRowCount;
- private final CompressionCodec codec;
- private final List<OrcProto.Type> types;
- private final int bufferSize;
- private final boolean[] included;
- private final long rowIndexStride;
- private long rowInStripe = 0;
- private int currentStripe = -1;
- private long rowBaseInStripe = 0;
- private long rowCountInStripe = 0;
- private final Map<StreamName, InStream> streams =
- new HashMap<StreamName, InStream>();
- DiskRangeList bufferChunks = null;
- private final TreeReaderFactory.TreeReader reader;
- private final OrcProto.RowIndex[] indexes;
- private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
- private final SargApplier sargApp;
- // an array about which row groups aren't skipped
- private boolean[] includedRowGroups = null;
- private final DataReader dataReader;
+ private final VectorizedRowBatch batch;
+ private int rowInBatch;
+ private long baseRow;
- /**
- * Given a list of column names, find the given column and return the index.
- *
- * @param columnNames the list of potential column names
- * @param columnName the column name to look for
- * @param rootColumn offset the result with the rootColumn
- * @return the column number or -1 if the column wasn't found
- */
- static int findColumns(String[] columnNames,
- String columnName,
- int rootColumn) {
- for(int i=0; i < columnNames.length; ++i) {
- if (columnName.equals(columnNames[i])) {
- return i + rootColumn;
- }
- }
- return -1;
+ protected RecordReaderImpl(ReaderImpl fileReader,
+ Reader.Options options) throws IOException {
+ super(fileReader, options);
+ batch = this.schema.createRowBatch();
+ rowInBatch = 0;
}
/**
- * Find the mapping from predicate leaves to columns.
- * @param sargLeaves the search argument that we need to map
- * @param columnNames the names of the columns
- * @param rootColumn the offset of the top level row, which offsets the
- * result
- * @return an array mapping the sarg leaves to concrete column numbers
+ * If the current batch is empty, get a new one.
+ * @return true if we have rows available.
+ * @throws IOException
*/
- public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
- String[] columnNames,
- int rootColumn) {
- int[] result = new int[sargLeaves.size()];
- Arrays.fill(result, -1);
- for(int i=0; i < result.length; ++i) {
- String colName = sargLeaves.get(i).getColumnName();
- result[i] = findColumns(columnNames, colName, rootColumn);
+ boolean ensureBatch() throws IOException {
+ if (rowInBatch >= batch.size) {
+ baseRow = super.getRowNumber();
+ rowInBatch = 0;
+ return super.nextBatch(batch);
}
- return result;
+ return true;
}
- protected RecordReaderImpl(ReaderImpl fileReader,
- Reader.Options options) throws IOException {
- SchemaEvolution treeReaderSchema;
- this.included = options.getInclude();
- included[0] = true;
- if (options.getSchema() == null) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Schema on read not provided -- using file schema " +
- fileReader.getSchema());
- }
- treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
- } else {
+ @Override
+ public long getRowNumber() {
+ return baseRow + rowInBatch;
+ }
- // Now that we are creating a record reader for a file, validate that the schema to read
- // is compatible with the file schema.
- //
- treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
- options.getSchema(),
- included);
- }
- this.path = fileReader.path;
- this.codec = fileReader.codec;
- this.types = fileReader.types;
- this.bufferSize = fileReader.bufferSize;
- this.rowIndexStride = fileReader.rowIndexStride;
- FileSystem fileSystem = fileReader.fileSystem;
- SearchArgument sarg = options.getSearchArgument();
- if (sarg != null && rowIndexStride != 0) {
- sargApp = new SargApplier(
- sarg, options.getColumnNames(), rowIndexStride, types, included.length);
- } else {
- sargApp = null;
- }
- long rows = 0;
- long skippedRows = 0;
- long offset = options.getOffset();
- long maxOffset = options.getMaxOffset();
- for(StripeInformation stripe: fileReader.getStripes()) {
- long stripeStart = stripe.getOffset();
- if (offset > stripeStart) {
- skippedRows += stripe.getNumberOfRows();
- } else if (stripeStart < maxOffset) {
- this.stripes.add(stripe);
- rows += stripe.getNumberOfRows();
- }
- }
+ @Override
+ public boolean hasNext() throws IOException {
+ return ensureBatch();
+ }
- Boolean zeroCopy = options.getUseZeroCopy();
- if (zeroCopy == null) {
- zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
- }
- if (options.getDataReader() == null) {
- dataReader = RecordReaderUtils.createDefaultDataReader(
- DataReaderProperties.builder()
- .withBufferSize(bufferSize)
- .withCompression(fileReader.compressionKind)
- .withFileSystem(fileSystem)
- .withPath(path)
- .withTypeCount(types.size())
- .withZeroCopy(zeroCopy)
- .build());
+ @Override
+ public void seekToRow(long row) throws IOException {
+ if (row >= baseRow && row < baseRow + batch.size) {
+ rowInBatch = (int) (row - baseRow);
} else {
- dataReader = options.getDataReader();
+ super.seekToRow(row);
+ batch.size = 0;
+ ensureBatch();
}
- firstRow = skippedRows;
- totalRowCount = rows;
- Boolean skipCorrupt = options.getSkipCorruptRecords();
- if (skipCorrupt == null) {
- skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
- }
-
- reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
- treeReaderSchema, included, skipCorrupt);
- indexes = new OrcProto.RowIndex[types.size()];
- bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
- advanceToNextRow(reader, 0L, true);
}
- public static final class PositionProviderImpl implements PositionProvider {
- private final OrcProto.RowIndexEntry entry;
- private int index;
-
- public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
- this(entry, 0);
+ @Override
+ public Object next(Object previous) throws IOException {
+ if (!ensureBatch()) {
+ return null;
}
-
- public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
- this.entry = entry;
- this.index = startPos;
+ if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+ OrcStruct result;
+ List<TypeDescription> children = schema.getChildren();
+ int numberOfChildren = children.size();
+ if (previous == null || previous.getClass() != OrcStruct.class) {
+ result = new OrcStruct(numberOfChildren);
+ previous = result;
+ } else {
+ result = (OrcStruct) previous;
+ if (result.getNumFields() != numberOfChildren) {
+ result.setNumFields(numberOfChildren);
+ }
+ }
+ for(int i=0; i < numberOfChildren; ++i) {
+ result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
+ children.get(i), result.getFieldValue(i)));
+ }
+ } else {
+ previous = nextValue(batch.cols[0], rowInBatch, schema, previous);
}
+ rowInBatch += 1;
+ return previous;
+ }
- @Override
- public long getNext() {
- return entry.getPositions(index++);
+ public boolean nextBatch(VectorizedRowBatch theirBatch) throws IOException {
+ // If the user hasn't been reading by row, use the fast path.
+ if (rowInBatch >= batch.size) {
+ return super.nextBatch(theirBatch);
}
+ copyIntoBatch(theirBatch, batch, rowInBatch);
+ rowInBatch += theirBatch.size;
+ return theirBatch.size > 0;
}
- OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
- return dataReader.readStripeFooter(stripe);
+ @Override
+ public void close() throws IOException {
+ super.close();
+ // free the memory for the column vectors
+ batch.cols = null;
}
- enum Location {
- BEFORE, MIN, MIDDLE, MAX, AFTER
- }
+ /* Routines for stubbing into Writables */
- /**
- * Given a point and min and max, determine if the point is before, at the
- * min, in the middle, at the max, or after the range.
- * @param point the point to test
- * @param min the minimum point
- * @param max the maximum point
- * @param <T> the type of the comparision
- * @return the location of the point
- */
- static <T> Location compareToRange(Comparable<T> point, T min, T max) {
- int minCompare = point.compareTo(min);
- if (minCompare < 0) {
- return Location.BEFORE;
- } else if (minCompare == 0) {
- return Location.MIN;
+ static BooleanWritable nextBoolean(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- int maxCompare = point.compareTo(max);
- if (maxCompare > 0) {
- return Location.AFTER;
- } else if (maxCompare == 0) {
- return Location.MAX;
+ if (vector.noNulls || !vector.isNull[row]) {
+ BooleanWritable result;
+ if (previous == null || previous.getClass() != BooleanWritable.class) {
+ result = new BooleanWritable();
+ } else {
+ result = (BooleanWritable) previous;
+ }
+ result.set(((LongColumnVector) vector).vector[row] != 0);
+ return result;
+ } else {
+ return null;
}
- return Location.MIDDLE;
}
- /**
- * Get the maximum value out of an index entry.
- * @param index
- * the index entry
- * @return the object for the maximum value or null if there isn't one
- */
- static Object getMax(ColumnStatistics index) {
- if (index instanceof IntegerColumnStatistics) {
- return ((IntegerColumnStatistics) index).getMaximum();
- } else if (index instanceof DoubleColumnStatistics) {
- return ((DoubleColumnStatistics) index).getMaximum();
- } else if (index instanceof StringColumnStatistics) {
- return ((StringColumnStatistics) index).getMaximum();
- } else if (index instanceof DateColumnStatistics) {
- return ((DateColumnStatistics) index).getMaximum();
- } else if (index instanceof DecimalColumnStatistics) {
- return ((DecimalColumnStatistics) index).getMaximum();
- } else if (index instanceof TimestampColumnStatistics) {
- return ((TimestampColumnStatistics) index).getMaximum();
- } else if (index instanceof BooleanColumnStatistics) {
- if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
- return Boolean.TRUE;
+ static ByteWritable nextByte(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ ByteWritable result;
+ if (previous == null || previous.getClass() != ByteWritable.class) {
+ result = new ByteWritable();
} else {
- return Boolean.FALSE;
+ result = (ByteWritable) previous;
}
+ result.set((byte) ((LongColumnVector) vector).vector[row]);
+ return result;
} else {
return null;
}
}
- /**
- * Get the minimum value out of an index entry.
- * @param index
- * the index entry
- * @return the object for the minimum value or null if there isn't one
- */
- static Object getMin(ColumnStatistics index) {
- if (index instanceof IntegerColumnStatistics) {
- return ((IntegerColumnStatistics) index).getMinimum();
- } else if (index instanceof DoubleColumnStatistics) {
- return ((DoubleColumnStatistics) index).getMinimum();
- } else if (index instanceof StringColumnStatistics) {
- return ((StringColumnStatistics) index).getMinimum();
- } else if (index instanceof DateColumnStatistics) {
- return ((DateColumnStatistics) index).getMinimum();
- } else if (index instanceof DecimalColumnStatistics) {
- return ((DecimalColumnStatistics) index).getMinimum();
- } else if (index instanceof TimestampColumnStatistics) {
- return ((TimestampColumnStatistics) index).getMinimum();
- } else if (index instanceof BooleanColumnStatistics) {
- if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
- return Boolean.FALSE;
+ static ShortWritable nextShort(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ ShortWritable result;
+ if (previous == null || previous.getClass() != ShortWritable.class) {
+ result = new ShortWritable();
} else {
- return Boolean.TRUE;
+ result = (ShortWritable) previous;
}
+ result.set((short) ((LongColumnVector) vector).vector[row]);
+ return result;
} else {
- return UNKNOWN_VALUE; // null is not safe here
+ return null;
}
}
- /**
- * Evaluate a predicate with respect to the statistics from the column
- * that is referenced in the predicate.
- * @param statsProto the statistics for the column mentioned in the predicate
- * @param predicate the leaf predicate we need to evaluation
- * @param bloomFilter
- * @return the set of truth values that may be returned for the given
- * predicate.
- */
- static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
- PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
- ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
- Object minValue = getMin(cs);
- Object maxValue = getMax(cs);
- BloomFilterIO bf = null;
- if (bloomFilter != null) {
- bf = new BloomFilterIO(bloomFilter);
+ static IntWritable nextInt(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
- }
-
- /**
- * Evaluate a predicate with respect to the statistics from the column
- * that is referenced in the predicate.
- * @param stats the statistics for the column mentioned in the predicate
- * @param predicate the leaf predicate we need to evaluation
- * @return the set of truth values that may be returned for the given
- * predicate.
- */
- static TruthValue evaluatePredicate(ColumnStatistics stats,
- PredicateLeaf predicate, BloomFilterIO bloomFilter) {
- Object minValue = getMin(stats);
- Object maxValue = getMax(stats);
- return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
- }
-
- static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
- Object max, boolean hasNull, BloomFilterIO bloomFilter) {
- // if we didn't have any values, everything must have been null
- if (min == null) {
- if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
- return TruthValue.YES;
+ if (vector.noNulls || !vector.isNull[row]) {
+ IntWritable result;
+ if (previous == null || previous.getClass() != IntWritable.class) {
+ result = new IntWritable();
} else {
- return TruthValue.NULL;
+ result = (IntWritable) previous;
}
- } else if (min == UNKNOWN_VALUE) {
- return TruthValue.YES_NO_NULL;
+ result.set((int) ((LongColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
}
+ }
- TruthValue result;
- Object baseObj = predicate.getLiteral();
- try {
- // Predicate object and stats objects are converted to the type of the predicate object.
- Object minValue = getBaseObjectForComparison(predicate.getType(), min);
- Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
- Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
-
- result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
- if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
- result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
- }
- // in case failed conversion, return the default YES_NO_NULL truth value
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- final String statsType = min == null ?
- (max == null ? "null" : max.getClass().getSimpleName()) :
- min.getClass().getSimpleName();
- final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
- final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
- " Skipping ORC PPD." +
- " Exception: " + e.getMessage() +
- " StatsType: " + statsType +
- " PredicateType: " + predicateType;
- LOG.warn(reason);
- LOG.debug(reason, e);
- }
- if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
- result = TruthValue.YES_NO;
+ static LongWritable nextLong(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ LongWritable result;
+ if (previous == null || previous.getClass() != LongWritable.class) {
+ result = new LongWritable();
} else {
- result = TruthValue.YES_NO_NULL;
+ result = (LongWritable) previous;
}
+ result.set(((LongColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
}
- return result;
}
- private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
- TruthValue result, BloomFilterIO bloomFilter) {
- // evaluate bloom filter only when
- // 1) Bloom filter is available
- // 2) Min/Max evaluation yield YES or MAYBE
- // 3) Predicate is EQUALS or IN list
- if (bloomFilter != null
- && result != TruthValue.NO_NULL && result != TruthValue.NO
- && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
- || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
- || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
- return true;
+ static FloatWritable nextFloat(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- return false;
- }
-
- private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
- Object minValue,
- Object maxValue,
- boolean hasNull) {
- Location loc;
-
- switch (predicate.getOperator()) {
- case NULL_SAFE_EQUALS:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.BEFORE || loc == Location.AFTER) {
- return TruthValue.NO;
- } else {
- return TruthValue.YES_NO;
- }
- case EQUALS:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (minValue.equals(maxValue) && loc == Location.MIN) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc == Location.BEFORE || loc == Location.AFTER) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case LESS_THAN:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.AFTER) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc == Location.BEFORE || loc == Location.MIN) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case LESS_THAN_EQUALS:
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.AFTER || loc == Location.MAX) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc == Location.BEFORE) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case IN:
- if (minValue.equals(maxValue)) {
- // for a single value, look through to see if that value is in the
- // set
- for (Object arg : predicate.getLiteralList()) {
- predObj = getBaseObjectForComparison(predicate.getType(), arg);
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.MIN) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- }
- }
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- // are all of the values outside of the range?
- for (Object arg : predicate.getLiteralList()) {
- predObj = getBaseObjectForComparison(predicate.getType(), arg);
- loc = compareToRange((Comparable) predObj, minValue, maxValue);
- if (loc == Location.MIN || loc == Location.MIDDLE ||
- loc == Location.MAX) {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- }
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- }
- case BETWEEN:
- List<Object> args = predicate.getLiteralList();
- Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
-
- loc = compareToRange((Comparable) predObj1, minValue, maxValue);
- if (loc == Location.BEFORE || loc == Location.MIN) {
- Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
-
- Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
- if (loc2 == Location.AFTER || loc2 == Location.MAX) {
- return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
- } else if (loc2 == Location.BEFORE) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- } else if (loc == Location.AFTER) {
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- } else {
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
- }
- case IS_NULL:
- // min = null condition above handles the all-nulls YES case
- return hasNull ? TruthValue.YES_NO : TruthValue.NO;
- default:
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ if (vector.noNulls || !vector.isNull[row]) {
+ FloatWritable result;
+ if (previous == null || previous.getClass() != FloatWritable.class) {
+ result = new FloatWritable();
+ } else {
+ result = (FloatWritable) previous;
+ }
+ result.set((float) ((DoubleColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
}
}
- private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
- final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
- switch (predicate.getOperator()) {
- case NULL_SAFE_EQUALS:
- // null safe equals does not return *_NULL variant. So set hasNull to false
- return checkInBloomFilter(bloomFilter, predObj, false);
- case EQUALS:
- return checkInBloomFilter(bloomFilter, predObj, hasNull);
- case IN:
- for (Object arg : predicate.getLiteralList()) {
- // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
- Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
- TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
- if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
- return result;
- }
- }
- return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
- default:
- return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+ static DoubleWritable nextDouble(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- }
-
- private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
- TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-
- if (predObj instanceof Long) {
- if (bf.testLong(((Long) predObj).longValue())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof Double) {
- if (bf.testDouble(((Double) predObj).doubleValue())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof String || predObj instanceof Text ||
- predObj instanceof HiveDecimalWritable ||
- predObj instanceof BigDecimal) {
- if (bf.testString(predObj.toString())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof Timestamp) {
- if (bf.testLong(((Timestamp) predObj).getTime())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof TimestampWritable) {
- if (bf.testLong(((TimestampWritable) predObj).getTimestamp().getTime())) {
- result = TruthValue.YES_NO_NULL;
- }
- } else if (predObj instanceof Date) {
- if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
- result = TruthValue.YES_NO_NULL;
+ if (vector.noNulls || !vector.isNull[row]) {
+ DoubleWritable result;
+ if (previous == null || previous.getClass() != DoubleWritable.class) {
+ result = new DoubleWritable();
+ } else {
+ result = (DoubleWritable) previous;
}
+ result.set(((DoubleColumnVector) vector).vector[row]);
+ return result;
} else {
- // if the predicate object is null and if hasNull says there are no nulls then return NO
- if (predObj == null && !hasNull) {
- result = TruthValue.NO;
- } else {
- result = TruthValue.YES_NO_NULL;
- }
- }
-
- if (result == TruthValue.YES_NO_NULL && !hasNull) {
- result = TruthValue.YES_NO;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Bloom filter evaluation: " + result.toString());
+ return null;
}
-
- return result;
}
- private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
- if (obj == null) {
- return null;
+ static Text nextString(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- switch (type) {
- case BOOLEAN:
- if (obj instanceof Boolean) {
- return obj;
- } else {
- // will only be true if the string conversion yields "true", all other values are
- // considered false
- return Boolean.valueOf(obj.toString());
- }
- case DATE:
- if (obj instanceof Date) {
- return obj;
- } else if (obj instanceof String) {
- return Date.valueOf((String) obj);
- } else if (obj instanceof Timestamp) {
- return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
- }
- // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
- break;
- case DECIMAL:
- if (obj instanceof Boolean) {
- return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
- HiveDecimal.ONE : HiveDecimal.ZERO);
- } else if (obj instanceof Integer) {
- return new HiveDecimalWritable(((Integer) obj).intValue());
- } else if (obj instanceof Long) {
- return new HiveDecimalWritable(((Long) obj));
- } else if (obj instanceof Float || obj instanceof Double ||
- obj instanceof String) {
- return new HiveDecimalWritable(obj.toString());
- } else if (obj instanceof BigDecimal) {
- return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
- } else if (obj instanceof HiveDecimal) {
- return new HiveDecimalWritable((HiveDecimal) obj);
- } else if (obj instanceof HiveDecimalWritable) {
- return obj;
- } else if (obj instanceof Timestamp) {
- return new HiveDecimalWritable(
- new Double(new TimestampWritable((Timestamp) obj).getDouble()).toString());
- }
- break;
- case FLOAT:
- if (obj instanceof Number) {
- // widening conversion
- return ((Number) obj).doubleValue();
- } else if (obj instanceof HiveDecimal) {
- return ((HiveDecimal) obj).doubleValue();
- } else if (obj instanceof String) {
- return Double.valueOf(obj.toString());
- } else if (obj instanceof Timestamp) {
- return new TimestampWritable((Timestamp)obj).getDouble();
- } else if (obj instanceof HiveDecimal) {
- return ((HiveDecimal) obj).doubleValue();
- } else if (obj instanceof BigDecimal) {
- return ((BigDecimal) obj).doubleValue();
- }
- break;
- case LONG:
- if (obj instanceof Number) {
- // widening conversion
- return ((Number) obj).longValue();
- } else if (obj instanceof HiveDecimal) {
- return ((HiveDecimal) obj).longValue();
- } else if (obj instanceof String) {
- return Long.valueOf(obj.toString());
- }
- break;
- case STRING:
- if (obj != null) {
- return (obj.toString());
- }
- break;
- case TIMESTAMP:
- if (obj instanceof Timestamp) {
- return obj;
- } else if (obj instanceof Integer) {
- return TimestampWritable.longToTimestamp(((Number) obj).longValue(), false);
- } else if (obj instanceof Float) {
- return TimestampWritable.doubleToTimestamp(((Float) obj).doubleValue());
- } else if (obj instanceof Double) {
- return TimestampWritable.doubleToTimestamp(((Double) obj).doubleValue());
- } else if (obj instanceof HiveDecimal) {
- return TimestampWritable.decimalToTimestamp((HiveDecimal) obj);
- } else if (obj instanceof HiveDecimalWritable) {
- return TimestampWritable.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
- } else if (obj instanceof Date) {
- return new Timestamp(((Date) obj).getTime());
- }
- // float/double conversion to timestamp is interpreted as seconds whereas integer conversion
- // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
- // is also config driven. The filter operator changes its promotion based on config:
- // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
- break;
- default:
- break;
+ if (vector.noNulls || !vector.isNull[row]) {
+ Text result;
+ if (previous == null || previous.getClass() != Text.class) {
+ result = new Text();
+ } else {
+ result = (Text) previous;
+ }
+ BytesColumnVector bytes = (BytesColumnVector) vector;
+ result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+ return result;
+ } else {
+ return null;
}
-
- throw new IllegalArgumentException(String.format(
- "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
- .getSimpleName(), type));
}
- public static class SargApplier {
- public final static boolean[] READ_ALL_RGS = null;
- public final static boolean[] READ_NO_RGS = new boolean[0];
-
- private final SearchArgument sarg;
- private final List<PredicateLeaf> sargLeaves;
- private final int[] filterColumns;
- private final long rowIndexStride;
- // same as the above array, but indices are set to true
- private final boolean[] sargColumns;
-
- public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
- List<OrcProto.Type> types, int includedCount) {
- this.sarg = sarg;
- sargLeaves = sarg.getLeaves();
- filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
- this.rowIndexStride = rowIndexStride;
- // included will not be null, row options will fill the array with trues if null
- sargColumns = new boolean[includedCount];
- for (int i : filterColumns) {
- // filter columns may have -1 as index which could be partition column in SARG.
- if (i > 0) {
- sargColumns[i] = true;
- }
- }
+ static HiveCharWritable nextChar(ColumnVector vector,
+ int row,
+ int size,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
-
- /**
- * Pick the row groups that we need to load from the current stripe.
- *
- * @return an array with a boolean for each row group or null if all of the
- * row groups must be read.
- * @throws IOException
- */
- public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
- OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
- long rowsInStripe = stripe.getNumberOfRows();
- int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
- boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
- TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
- boolean hasSelected = false, hasSkipped = false;
- for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
- for (int pred = 0; pred < leafValues.length; ++pred) {
- int columnIx = filterColumns[pred];
- if (columnIx != -1) {
- if (indexes[columnIx] == null) {
- throw new AssertionError("Index is not populated for " + columnIx);
- }
- OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
- if (entry == null) {
- throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
- }
- OrcProto.ColumnStatistics stats = entry.getStatistics();
- OrcProto.BloomFilter bf = null;
- if (bloomFilterIndices != null && bloomFilterIndices[filterColumns[pred]] != null) {
- bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
- }
- leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Stats = " + stats);
- LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
- }
- } else {
- // the column is a virtual column
- leafValues[pred] = TruthValue.YES_NO_NULL;
- }
- }
- result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
- hasSelected = hasSelected || result[rowGroup];
- hasSkipped = hasSkipped || (!result[rowGroup]);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
- (rowIndexStride * (rowGroup + 1) - 1) + " is " +
- (result[rowGroup] ? "" : "not ") + "included.");
- }
+ if (vector.noNulls || !vector.isNull[row]) {
+ HiveCharWritable result;
+ if (previous == null || previous.getClass() != HiveCharWritable.class) {
+ result = new HiveCharWritable();
+ } else {
+ result = (HiveCharWritable) previous;
}
-
- return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
+ BytesColumnVector bytes = (BytesColumnVector) vector;
+ result.set(bytes.toString(row), size);
+ return result;
+ } else {
+ return null;
}
}
- /**
- * Pick the row groups that we need to load from the current stripe.
- *
- * @return an array with a boolean for each row group or null if all of the
- * row groups must be read.
- * @throws IOException
- */
- protected boolean[] pickRowGroups() throws IOException {
- // if we don't have a sarg or indexes, we read everything
- if (sargApp == null) {
+ static HiveVarcharWritable nextVarchar(ColumnVector vector,
+ int row,
+ int size,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ HiveVarcharWritable result;
+ if (previous == null || previous.getClass() != HiveVarcharWritable.class) {
+ result = new HiveVarcharWritable();
+ } else {
+ result = (HiveVarcharWritable) previous;
+ }
+ BytesColumnVector bytes = (BytesColumnVector) vector;
+ result.set(bytes.toString(row), size);
+ return result;
+ } else {
return null;
}
- readRowIndex(currentStripe, included, sargApp.sargColumns);
- return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
}
- private void clearStreams() {
- // explicit close of all streams to de-ref ByteBuffers
- for (InStream is : streams.values()) {
- is.close();
+ static BytesWritable nextBinary(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- if (bufferChunks != null) {
- if (dataReader.isTrackingDiskRanges()) {
- for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
- if (!(range instanceof BufferChunk)) {
- continue;
- }
- dataReader.releaseBuffer(((BufferChunk) range).getChunk());
- }
+ if (vector.noNulls || !vector.isNull[row]) {
+ BytesWritable result;
+ if (previous == null || previous.getClass() != BytesWritable.class) {
+ result = new BytesWritable();
+ } else {
+ result = (BytesWritable) previous;
}
+ BytesColumnVector bytes = (BytesColumnVector) vector;
+ result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+ return result;
+ } else {
+ return null;
}
- bufferChunks = null;
- streams.clear();
}
- /**
- * Read the current stripe into memory.
- *
- * @throws IOException
- */
- private void readStripe() throws IOException {
- StripeInformation stripe = beginReadStripe();
- includedRowGroups = pickRowGroups();
-
- // move forward to the first unskipped row
- if (includedRowGroups != null) {
- while (rowInStripe < rowCountInStripe &&
- !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
- rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
- }
+ static HiveDecimalWritable nextDecimal(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
-
- // if we haven't skipped the whole stripe, read the data
- if (rowInStripe < rowCountInStripe) {
- // if we aren't projecting columns or filtering rows, just read it all
- if (included == null && includedRowGroups == null) {
- readAllDataStreams(stripe);
+ if (vector.noNulls || !vector.isNull[row]) {
+ HiveDecimalWritable result;
+ if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
+ result = new HiveDecimalWritable();
} else {
- readPartialDataStreams(stripe);
- }
- reader.startStripe(streams, stripeFooter);
- // if we skipped the first row group, move the pointers forward
- if (rowInStripe != 0) {
- seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+ result = (HiveDecimalWritable) previous;
}
+ result.set(((DecimalColumnVector) vector).vector[row]);
+ return result;
+ } else {
+ return null;
}
}
- private StripeInformation beginReadStripe() throws IOException {
- StripeInformation stripe = stripes.get(currentStripe);
- stripeFooter = readStripeFooter(stripe);
- clearStreams();
- // setup the position in the stripe
- rowCountInStripe = stripe.getNumberOfRows();
- rowInStripe = 0;
- rowBaseInStripe = 0;
- for (int i = 0; i < currentStripe; ++i) {
- rowBaseInStripe += stripes.get(i).getNumberOfRows();
+ static DateWritable nextDate(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- // reset all of the indexes
- for (int i = 0; i < indexes.length; ++i) {
- indexes[i] = null;
+ if (vector.noNulls || !vector.isNull[row]) {
+ DateWritable result;
+ if (previous == null || previous.getClass() != DateWritable.class) {
+ result = new DateWritable();
+ } else {
+ result = (DateWritable) previous;
+ }
+ int date = (int) ((LongColumnVector) vector).vector[row];
+ result.set(date);
+ return result;
+ } else {
+ return null;
}
- return stripe;
}
- private void readAllDataStreams(StripeInformation stripe) throws IOException {
- long start = stripe.getIndexLength();
- long end = start + stripe.getDataLength();
- // explicitly trigger 1 big read
- DiskRangeList toRead = new DiskRangeList(start, end);
- bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
- List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
- createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
- }
-
- /**
- * Plan the ranges of the file that we need to read given the list of
- * columns and row groups.
- *
- * @param streamList the list of streams available
- * @param indexes the indexes that have been loaded
- * @param includedColumns which columns are needed
- * @param includedRowGroups which row groups are needed
- * @param isCompressed does the file have generic compression
- * @param encodings the encodings for each column
- * @param types the types of the columns
- * @param compressionSize the compression block size
- * @return the list of disk ranges that will be loaded
- */
- static DiskRangeList planReadPartialDataStreams
- (List<OrcProto.Stream> streamList,
- OrcProto.RowIndex[] indexes,
- boolean[] includedColumns,
- boolean[] includedRowGroups,
- boolean isCompressed,
- List<OrcProto.ColumnEncoding> encodings,
- List<OrcProto.Type> types,
- int compressionSize,
- boolean doMergeBuffers) {
- long offset = 0;
- // figure out which columns have a present stream
- boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- CreateHelper list = new CreateHelper();
- for (OrcProto.Stream stream : streamList) {
- long length = stream.getLength();
- int column = stream.getColumn();
- OrcProto.Stream.Kind streamKind = stream.getKind();
- // since stream kind is optional, first check if it exists
- if (stream.hasKind() &&
- (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
- (column < includedColumns.length && includedColumns[column])) {
- // if we aren't filtering or it is a dictionary, load it.
- if (includedRowGroups == null
- || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
- RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
- } else {
- RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
- isCompressed, indexes[column], encodings.get(column), types.get(column),
- compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
- }
+ static TimestampWritable nextTimestamp(ColumnVector vector,
+ int row,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ TimestampWritable result;
+ if (previous == null || previous.getClass() != TimestampWritable.class) {
+ result = new TimestampWritable();
+ } else {
+ result = (TimestampWritable) previous;
}
- offset += length;
+ TimestampColumnVector tcv = (TimestampColumnVector) vector;
+ result.setInternal(tcv.time[row], tcv.nanos[row]);
+ return result;
+ } else {
+ return null;
}
- return list.extract();
}
- void createStreams(List<OrcProto.Stream> streamDescriptions,
- DiskRangeList ranges,
- boolean[] includeColumn,
- CompressionCodec codec,
- int bufferSize,
- Map<StreamName, InStream> streams) throws IOException {
- long streamOffset = 0;
- for (OrcProto.Stream streamDesc : streamDescriptions) {
- int column = streamDesc.getColumn();
- if ((includeColumn != null &&
- (column < included.length && !includeColumn[column])) ||
- streamDesc.hasKind() &&
- (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
- streamOffset += streamDesc.getLength();
- continue;
+ static OrcStruct nextStruct(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
+ }
+ if (vector.noNulls || !vector.isNull[row]) {
+ OrcStruct result;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ int numChildren = childrenTypes.size();
+ if (previous == null || previous.getClass() != OrcStruct.class) {
+ result = new OrcStruct(numChildren);
+ } else {
+ result = (OrcStruct) previous;
+ result.setNumFields(numChildren);
+ }
+ StructColumnVector struct = (StructColumnVector) vector;
+ for(int f=0; f < numChildren; ++f) {
+ result.setFieldValue(f, nextValue(struct.fields[f], row,
+ childrenTypes.get(f), result.getFieldValue(f)));
}
- List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
- ranges, streamOffset, streamDesc.getLength());
- StreamName name = new StreamName(column, streamDesc.getKind());
- streams.put(name, InStream.create(name.toString(), buffers,
- streamDesc.getLength(), codec, bufferSize));
- streamOffset += streamDesc.getLength();
+ return result;
+ } else {
+ return null;
}
}
- private void readPartialDataStreams(StripeInformation stripe) throws IOException {
- List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
- DiskRangeList toRead = planReadPartialDataStreams(streamList,
- indexes, included, includedRowGroups, codec != null,
- stripeFooter.getColumnsList(), types, bufferSize, true);
- if (LOG.isDebugEnabled()) {
- LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+ static OrcUnion nextUnion(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
- if (LOG.isDebugEnabled()) {
- LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+ if (vector.noNulls || !vector.isNull[row]) {
+ OrcUnion result;
+ List<TypeDescription> childrenTypes = schema.getChildren();
+ if (previous == null || previous.getClass() != OrcUnion.class) {
+ result = new OrcUnion();
+ } else {
+ result = (OrcUnion) previous;
+ }
+ UnionColumnVector union = (UnionColumnVector) vector;
+ byte tag = (byte) union.tags[row];
+ result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
+ result.getObject()));
+ return result;
+ } else {
+ return null;
}
-
- createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
- }
-
- @Override
- public boolean hasNext() throws IOException {
- return rowInStripe < rowCountInStripe;
}
- /**
- * Read the next stripe until we find a row that we don't skip.
- *
- * @throws IOException
- */
- private void advanceStripe() throws IOException {
- rowInStripe = rowCountInStripe;
- while (rowInStripe >= rowCountInStripe &&
- currentStripe < stripes.size() - 1) {
- currentStripe += 1;
- readStripe();
+ static ArrayList<Object> nextList(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- }
-
- /**
- * Skip over rows that we aren't selecting, so that the next row is
- * one that we will read.
- *
- * @param nextRow the row we want to go to
- * @throws IOException
- */
- private boolean advanceToNextRow(
- TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
- throws IOException {
- long nextRowInStripe = nextRow - rowBaseInStripe;
- // check for row skipping
- if (rowIndexStride != 0 &&
- includedRowGroups != null &&
- nextRowInStripe < rowCountInStripe) {
- int rowGroup = (int) (nextRowInStripe / rowIndexStride);
- if (!includedRowGroups[rowGroup]) {
- while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
- rowGroup += 1;
- }
- if (rowGroup >= includedRowGroups.length) {
- if (canAdvanceStripe) {
- advanceStripe();
- }
- return canAdvanceStripe;
+ if (vector.noNulls || !vector.isNull[row]) {
+ ArrayList<Object> result;
+ if (previous == null || previous.getClass() != ArrayList.class) {
+ result = new ArrayList<>();
+ } else {
+ result = (ArrayList<Object>) previous;
+ }
+ ListColumnVector list = (ListColumnVector) vector;
+ int length = (int) list.lengths[row];
+ int offset = (int) list.offsets[row];
+ result.ensureCapacity(length);
+ int oldLength = result.size();
+ int idx = 0;
+ TypeDescription childType = schema.getChildren().get(0);
+ while (idx < length && idx < oldLength) {
+ result.set(idx, nextValue(list.child, offset + idx, childType,
+ result.get(idx)));
+ idx += 1;
+ }
+ if (length < oldLength) {
+ result.subList(length,result.size()).clear();
+ } else if (oldLength < length) {
+ while (idx < length) {
+ result.add(nextValue(list.child, offset + idx, childType, null));
+ idx += 1;
}
- nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
}
+ return result;
+ } else {
+ return null;
}
- if (nextRowInStripe >= rowCountInStripe) {
- if (canAdvanceStripe) {
- advanceStripe();
- }
- return canAdvanceStripe;
+ }
+
+ static HashMap<Object,Object> nextMap(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ if (vector.isRepeating) {
+ row = 0;
}
- if (nextRowInStripe != rowInStripe) {
- if (rowIndexStride != 0) {
- int rowGroup = (int) (nextRowInStripe / rowIndexStride);
- seekToRowEntry(reader, rowGroup);
- reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+ if (vector.noNulls || !vector.isNull[row]) {
+ MapColumnVector map = (MapColumnVector) vector;
+ int length = (int) map.lengths[row];
+ int offset = (int) map.offsets[row];
+ TypeDescription keyType = schema.getChildren().get(0);
+ TypeDescription valueType = schema.getChildren().get(1);
+ HashMap<Object,Object> result;
+ if (previous == null || previous.getClass() != HashMap.class) {
+ result = new HashMap<Object,Object>(length);
} else {
- reader.skipRows(nextRowInStripe - rowInStripe);
+ result = (HashMap<Object,Object>) previous;
+ // I couldn't think of a good way to reuse the keys and value objects
+ // without even more allocations, so take the easy and safe approach.
+ result.clear();
}
- rowInStripe = nextRowInStripe;
+ for(int e=0; e < length; ++e) {
+ result.put(nextValue(map.keys, e + offset, keyType, null),
+ nextValue(map.values, e + offset, valueType, null));
+ }
+ return result;
+ } else {
+ return null;
}
- return true;
}
- @Override
- public Object next(Object previous) throws IOException {
- try {
- final Object result = reader.next(previous);
- // find the next row
- rowInStripe += 1;
- advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
- return result;
- } catch (IOException e) {
- // Rethrow exception with file name in log message
- throw new IOException("Error reading file: " + path, e);
+ static Object nextValue(ColumnVector vector,
+ int row,
+ TypeDescription schema,
+ Object previous) {
+ switch (schema.getCategory()) {
+ case BOOLEAN:
+ return nextBoolean(vector, row, previous);
+ case BYTE:
+ return nextByte(vector, row, previous);
+ case SHORT:
+ return nextShort(vector, row, previous);
+ case INT:
+ return nextInt(vector, row, previous);
+ case LONG:
+ return nextLong(vector, row, previous);
+ case FLOAT:
+ return nextFloat(vector, row, previous);
+ case DOUBLE:
+ return nextDouble(vector, row, previous);
+ case STRING:
+ return nextString(vector, row, previous);
+ case CHAR:
+ return nextChar(vector, row, schema.getMaxLength(), previous);
+ case VARCHAR:
+ return nextVarchar(vector, row, schema.getMaxLength(), previous);
+ case BINARY:
+ return nextBinary(vector, row, previous);
+ case DECIMAL:
+ return nextDecimal(vector, row, previous);
+ case DATE:
+ return nextDate(vector, row, previous);
+ case TIMESTAMP:
+ return nextTimestamp(vector, row, previous);
+ case STRUCT:
+ return nextStruct(vector, row, schema, previous);
+ case UNION:
+ return nextUnion(vector, row, schema, previous);
+ case LIST:
+ return nextList(vector, row, schema, previous);
+ case MAP:
+ return nextMap(vector, row, schema, previous);
+ default:
+ throw new IllegalArgumentException("Unknown type " + schema);
}
}
- @Override
- public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
- try {
- if (rowInStripe >= rowCountInStripe) {
- currentStripe += 1;
- if (currentStripe >= stripes.size()) {
- batch.size = 0;
- return false;
+ /* Routines for copying between VectorizedRowBatches */
+
+ void copyLongColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ LongColumnVector lsource = (LongColumnVector) source;
+ LongColumnVector ldest = (LongColumnVector) destination;
+ ldest.isRepeating = lsource.isRepeating;
+ ldest.noNulls = lsource.noNulls;
+ if (source.isRepeating) {
+ ldest.isNull[0] = lsource.isNull[0];
+ ldest.vector[0] = lsource.vector[0];
+ } else {
+ if (!lsource.noNulls) {
+ for(int r=0; r < length; ++r) {
+ ldest.isNull[r] = lsource.isNull[sourceOffset + r];
+ ldest.vector[r] = lsource.vector[sourceOffset + r];
+ }
+ } else {
+ for (int r = 0; r < length; ++r) {
+ ldest.vector[r] = lsource.vector[sourceOffset + r];
}
- readStripe();
}
-
- int batchSize = computeBatchSize(batch.getMaxSize());
-
- rowInStripe += batchSize;
- reader.setVectorColumnCount(batch.getDataColumnCount());
- reader.nextBatch(batch, batchSize);
-
- batch.size = (int) batchSize;
- batch.selectedInUse = false;
- advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
- return batch.size != 0;
- } catch (IOException e) {
- // Rethrow exception with file name in log message
- throw new IOException("Error reading file: " + path, e);
}
}
- private int computeBatchSize(long targetBatchSize) {
- final int batchSize;
- // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
- // groups are selected then marker position is set to the end of range (subset of row groups
- // within strip). Batch size computed out of marker position makes sure that batch size is
- // aware of row group boundary and will not cause overflow when reading rows
- // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
- if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
- int startRowGroup = (int) (rowInStripe / rowIndexStride);
- if (!includedRowGroups[startRowGroup]) {
- while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
- startRowGroup += 1;
+ void copyDoubleColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ DoubleColumnVector castedSource = (DoubleColumnVector) source;
+ DoubleColumnVector castedDestination = (DoubleColumnVector) destination;
+ if (source.isRepeating) {
+ castedDestination.isRepeating = true;
+ castedDestination.noNulls = castedSource.noNulls;
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ castedDestination.vector[0] = castedSource.vector[0];
+ } else {
+ if (!castedSource.noNulls) {
+ castedDestination.noNulls = true;
+ for(int r=0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
}
}
-
- int endRowGroup = startRowGroup;
- while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
- endRowGroup += 1;
- }
-
- final long markerPosition =
- (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
- : rowCountInStripe;
- batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
-
- if (isLogDebugEnabled && batchSize < targetBatchSize) {
- LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
+ for(int r=0; r < length; ++r) {
+ castedDestination.vector[r] = castedSource.vector[sourceOffset + r];
}
- } else {
- batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
}
- return batchSize;
- }
-
- @Override
- public void close() throws IOException {
- clearStreams();
- dataReader.close();
- }
-
- @Override
- public long getRowNumber() {
- return rowInStripe + rowBaseInStripe + firstRow;
- }
-
- /**
- * Return the fraction of rows that have been read from the selected.
- * section of the file
- *
- * @return fraction between 0.0 and 1.0 of rows consumed
- */
- @Override
- public float getProgress() {
- return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
}
- private int findStripe(long rowNumber) {
- for (int i = 0; i < stripes.size(); i++) {
- StripeInformation stripe = stripes.get(i);
- if (stripe.getNumberOfRows() > rowNumber) {
- return i;
+ void copyTimestampColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ TimestampColumnVector castedSource = (TimestampColumnVector) source;
+ TimestampColumnVector castedDestination = (TimestampColumnVector) destination;
+ castedDestination.isRepeating = castedSource.isRepeating;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ castedDestination.time[0] = castedSource.time[0];
+ castedDestination.nanos[0] = castedSource.nanos[0];
+ } else {
+ if (!castedSource.noNulls) {
+ castedDestination.noNulls = true;
+ for(int r=0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ castedDestination.time[r] = castedSource.time[sourceOffset + r];
+ castedDestination.nanos[r] = castedSource.nanos[sourceOffset + r];
+ }
+ } else {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.time[r] = castedSource.time[sourceOffset + r];
+ castedDestination.nanos[r] = castedSource.nanos[sourceOffset + r];
+ }
}
- rowNumber -= stripe.getNumberOfRows();
}
- throw new IllegalArgumentException("Seek after the end of reader range");
}
- OrcIndex readRowIndex(
- int stripeIndex, boolean[] included, boolean[] sargColumns) throws IOException {
- return readRowIndex(stripeIndex, included, null, null, sargColumns);
+ void copyDecimalColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ DecimalColumnVector castedSource = (DecimalColumnVector) source;
+ DecimalColumnVector castedDestination = (DecimalColumnVector) destination;
+ castedDestination.isRepeating = castedSource.isRepeating;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ if (!castedSource.isNull[0]) {
+ castedDestination.set(0, castedSource.vector[0]);
+ }
+ } else {
+ if (!castedSource.noNulls) {
+ for(int r=0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ if (!castedDestination.isNull[r]) {
+ castedDestination.set(r, castedSource.vector[r]);
+ }
+ }
+ } else {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.set(r, castedSource.vector[r]);
+ }
+ }
+ }
}
- OrcIndex readRowIndex(int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes,
- OrcProto.BloomFilterIndex[] bloomFilterIndex, boolean[] sargColumns) throws IOException {
- StripeInformation stripe = stripes.get(stripeIndex);
- OrcProto.StripeFooter stripeFooter = null;
- // if this is the current stripe, use the cached objects.
- if (stripeIndex == currentStripe) {
- stripeFooter = this.stripeFooter;
- indexes = indexes == null ? this.indexes : indexes;
- bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
- sargColumns = sargColumns == null ?
- (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
+ void copyBytesColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ BytesColumnVector castedSource = (BytesColumnVector) source;
+ BytesColumnVector castedDestination = (BytesColumnVector) destination;
+ castedDestination.isRepeating = castedSource.isRepeating;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ if (!castedSource.isNull[0]) {
+ castedDestination.setVal(0, castedSource.vector[0],
+ castedSource.start[0], castedSource.length[0]);
+ }
+ } else {
+ if (!castedSource.noNulls) {
+ for(int r=0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ if (!castedDestination.isNull[r]) {
+ castedDestination.setVal(r, castedSource.vector[sourceOffset + r],
+ castedSource.start[sourceOffset + r],
+ castedSource.length[sourceOffset + r]);
+ }
+ }
+ } else {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.setVal(r, castedSource.vector[sourceOffset + r],
+ castedSource.start[sourceOffset + r],
+ castedSource.length[sourceOffset + r]);
+ }
+ }
}
- return dataReader.readRowIndex(stripe, stripeFooter, included, indexes,
- sargColumns, bloomFilterIndex);
}
- private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
- throws IOException {
- PositionProvider[] index = new PositionProvider[indexes.length];
- for (int i = 0; i < indexes.length; ++i) {
- if (indexes[i] != null) {
- index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+ void copyStructColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ StructColumnVector castedSource = (StructColumnVector) source;
+ StructColumnVector castedDestination = (StructColumnVector) destination;
+ castedDestination.isRepeating = castedSource.isRepeating;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ for(int c=0; c > castedSource.fields.length; ++c) {
+ copyColumn(castedDestination.fields[c], castedSource.fields[c], 0, 1);
+ }
+ } else {
+ if (!castedSource.noNulls) {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ }
+ } else {
+ for (int c = 0; c > castedSource.fields.length; ++c) {
+ copyColumn(castedDestination.fields[c], castedSource.fields[c],
+ sourceOffset, length);
+ }
}
}
- reader.seek(index);
}
- @Override
- public void seekToRow(long rowNumber) throws IOException {
- if (rowNumber < 0) {
- throw new IllegalArgumentException("Seek to a negative row number " +
- rowNumber);
- } else if (rowNumber < firstRow) {
- throw new IllegalArgumentException("Seek before reader range " +
- rowNumber);
+ void copyUnionColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ UnionColumnVector castedSource = (UnionColumnVector) source;
+ UnionColumnVector castedDestination = (UnionColumnVector) destination;
+ castedDestination.isRepeating = castedSource.isRepeating;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ int tag = castedSource.tags[0];
+ castedDestination.tags[0] = tag;
+ if (!castedDestination.isNull[0]) {
+ copyColumn(castedDestination.fields[tag], castedSource.fields[tag], 0,
+ 1);
+ }
+ } else {
+ if (!castedSource.noNulls) {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ castedDestination.tags[r] = castedSource.tags[sourceOffset + r];
+ }
+ } else {
+ for(int r=0; r < length; ++r) {
+ castedDestination.tags[r] = castedSource.tags[sourceOffset + r];
+ }
+ }
+ for(int c=0; c > castedSource.fields.length; ++c) {
+ copyColumn(castedDestination.fields[c], castedSource.fields[c],
+ sourceOffset, length);
+ }
}
- // convert to our internal form (rows from the beginning of slice)
- rowNumber -= firstRow;
+ }
- // move to the right stripe
- int rightStripe = findStripe(rowNumber);
- if (rightStripe != currentStripe) {
- currentStripe = rightStripe;
- readStripe();
+ void copyListColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ ListColumnVector castedSource = (ListColumnVector) source;
+ ListColumnVector castedDestination = (ListColumnVector) destination;
+ castedDestination.isRepeating = castedSource.noNulls;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ castedDestination.offsets[0] = 0;
+ castedDestination.lengths[0] = castedSource.lengths[0];
+ copyColumn(castedDestination.child, castedSource.child,
+ (int) castedSource.offsets[0], (int) castedSource.lengths[0]);
+ } else {
+ if (!castedSource.noNulls) {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ }
+ }
+ int minOffset = Integer.MAX_VALUE;
+ int maxOffset = Integer.MIN_VALUE;
+ for(int r=0; r < length; ++r) {
+ int childOffset = (int) castedSource.offsets[r + sourceOffset];
+ int childLength = (int) castedSource.lengths[r + sourceOffset];
+ castedDestination.offsets[r] = childOffset;
+ castedDestination.lengths[r] = childLength;
+ minOffset = Math.min(minOffset, childOffset);
+ maxOffset = Math.max(maxOffset, childOffset + childLength);
+ }
+ if (minOffset <= maxOffset) {
+ castedDestination.childCount = maxOffset - minOffset + 1;
+ copyColumn(castedDestination.child, castedSource.child,
+ minOffset, castedDestination.childCount);
+ } else {
+ castedDestination.childCount = 0;
+ }
+ }
+ }
+
+ void copyMapColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ MapColumnVector castedSource = (MapColumnVector) source;
+ MapColumnVector castedDestination = (MapColumnVector) destination;
+ castedDestination.isRepeating = castedSource.noNulls;
+ castedDestination.noNulls = castedSource.noNulls;
+ if (source.isRepeating) {
+ castedDestination.isNull[0] = castedSource.isNull[0];
+ castedDestination.offsets[0] = 0;
+ castedDestination.lengths[0] = castedSource.lengths[0];
+ copyColumn(castedDestination.keys, castedSource.keys,
+ (int) castedSource.offsets[0], (int) castedSource.lengths[0]);
+ copyColumn(castedDestination.values, castedSource.values,
+ (int) castedSource.offsets[0], (int) castedSource.lengths[0]);
+ } else {
+ if (!castedSource.noNulls) {
+ for (int r = 0; r < length; ++r) {
+ castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+ }
+ }
+ int minOffset = Integer.MAX_VALUE;
+ int maxOffset = Integer.MIN_VALUE;
+ for(int r=0; r < length; ++r) {
+ int childOffset = (int) castedSource.offsets[r + sourceOffset];
+ int childLength = (int) castedSource.lengths[r + sourceOffset];
+ castedDestination.offsets[r] = childOffset;
+ castedDestination.lengths[r] = childLength;
+ minOffset = Math.min(minOffset, childOffset);
+ maxOffset = Math.max(maxOffset, childOffset + childLength);
+ }
+ if (minOffset <= maxOffset) {
+ castedDestination.childCount = maxOffset - minOffset + 1;
+ copyColumn(castedDestination.keys, castedSource.keys,
+ minOffset, castedDestination.childCount);
+ copyColumn(castedDestination.values, castedSource.values,
+ minOffset, castedDestination.childCount);
+ } else {
+ castedDestination.childCount = 0;
+ }
}
- readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
-
- // if we aren't to the right row yet, advance in the stripe.
- advanceToNextRow(reader, rowNumber, true);
}
- private static final String TRANSLATED_SARG_SEPARATOR = "_";
- public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
- return rootColumn + TRANSLATED_SARG_SEPARATOR
- + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
+ void copyColumn(ColumnVector destination,
+ ColumnVector source,
+ int sourceOffset,
+ int length) {
+ if (source.getClass() == LongColumnVector.class) {
+ copyLongColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == DoubleColumnVector.class) {
+ copyDoubleColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == BytesColumnVector.class) {
+ copyBytesColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == TimestampColumnVector.class) {
+ copyTimestampColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == DecimalColumnVector.class) {
+ copyDecimalColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == StructColumnVector.class) {
+ copyStructColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == UnionColumnVector.class) {
+ copyUnionColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == ListColumnVector.class) {
+ copyListColumn(destination, source, sourceOffset, length);
+ } else if (source.getClass() == MapColumnVector.class) {
+ copyMapColumn(destination, source, sourceOffset, length);
+ }
}
- public static int[] mapTranslatedSargColumns(
- List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
- int[] result = new int[sargLeaves.size()];
- OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
- String lastRootStr = null;
- for (int i = 0; i < result.length; ++i) {
- String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
- assert rootAndIndex.length == 2;
- String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
- int index = Integer.parseInt(indexStr);
- // First, check if the column even maps to anything.
- if (index == -1) {
- result[i] = -1;
- continue;
- }
- assert index >= 0;
- // Then, find the root type if needed.
- if (!rootStr.equals(lastRootStr)) {
- lastRoot = types.get(Integer.parseInt(rootStr));
- lastRootStr = rootStr;
- }
- // Subtypes of the root types correspond, in order, to the columns in the table schema
- // (disregarding schema evolution that doesn't presently work). Get the index for the
- // corresponding subtype.
- result[i] = lastRoot.getSubtypes(index);
- }
- return result;
+ /**
+ * Copy part of a batch into the destination batch.
+ * @param destination the batch to copy into
+ * @param source the batch to copy from
+ * @param sourceStart the row number to start from in the source
+ * @return the number of rows copied
+ */
+ void copyIntoBatch(VectorizedRowBatch destination,
+ VectorizedRowBatch source,
+ int sourceStart) {
+ int rows = Math.min(source.size - sourceStart, destination.getMaxSize());
+ for(int c=0; c < source.cols.length; ++c) {
+ destination.cols[c].reset();
+ copyColumn(destination.cols[c], source.cols[c], sourceStart, rows);
+ }
+ destination.size = rows;
}
}